diff --git a/.gitignore b/.gitignore index dafefef7..669e887f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,11 +1,25 @@ -build/ -out/ -.gradle/ +# ## RethinkDB .gitignore + +# General .#* + +# IDE-related *.iml .idea/ +.vscode/ +out/ + +# Java-related +*.class + +# Gradle-related +build/ +.gradle/ confidential.properties +# Kotlin scripts +scripts/kotlinc/ + # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] @@ -27,7 +41,4 @@ virtualenv/ # RethinkDB scripts/*.proto - -# Editors -.vscode/ -.idea/ +test-dburl-override.txt \ No newline at end of file diff --git a/DEPLOYING.md b/DEPLOYING.md index 0055bbfb..1c1f400f 100644 --- a/DEPLOYING.md +++ b/DEPLOYING.md @@ -32,3 +32,7 @@ To upload a new release directly to Sonatype, run the Gradle task `uploadArchive After release, you may need to go to https://oss.sonatype.org/#stagingRepositories and search for "rethinkdb" in the search box, find the release that is in status `open`. Select it and then click the `Close` button. This will check it and make it ready for release. If that stage passes you can click the `Release` button. For full instructions see: http://central.sonatype.org/pages/releasing-the-deployment.html + +## After deploying: Documentations + +After deploying, the following file must be updated to reflect the new version: https://github.com/rethinkdb/docs/blob/master/0-getting-started/drivers/java.md \ No newline at end of file diff --git a/build.gradle.kts b/build.gradle.kts index 2129a595..e2721f1e 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -13,7 +13,7 @@ plugins { id("com.jfrog.bintray") version "1.8.4" } -version = "2.4.2" +version = "2.4.3" group = "com.rethinkdb" java.sourceCompatibility = JavaVersion.VERSION_1_8 diff --git a/src/main/java/com/rethinkdb/RethinkDB.java b/src/main/java/com/rethinkdb/RethinkDB.java index d6216dbe..79d65668 100644 --- a/src/main/java/com/rethinkdb/RethinkDB.java +++ b/src/main/java/com/rethinkdb/RethinkDB.java @@ -14,7 +14,7 @@ */ public class RethinkDB extends TopLevel { /** - * The Singleton to use to begin interacting with RethinkDB Driver + * The Singleton to use to begin interacting with RethinkDB Driver. */ public static final RethinkDB r = new RethinkDB(); /** @@ -25,7 +25,7 @@ public class RethinkDB extends TopLevel { /** * Gets (or creates, if null) the {@link ObjectMapper} for handling {@link com.rethinkdb.net.Result}'s values. * - * @return the {@link com.rethinkdb.net.Result}'s {@link ObjectMapper} + * @return the {@link com.rethinkdb.net.Result}'s {@link ObjectMapper}. */ public synchronized static @NotNull ObjectMapper getResultMapper() { ObjectMapper mapper = resultMapper; @@ -40,7 +40,7 @@ public class RethinkDB extends TopLevel { /** * Sets the {@link ObjectMapper} for handling {@link com.rethinkdb.net.Result}'s values. * - * @param mapper an {@link ObjectMapper}, or null + * @param mapper an {@link ObjectMapper}, or null. */ public synchronized static void setResultMapper(@Nullable ObjectMapper mapper) { resultMapper = mapper; @@ -49,7 +49,7 @@ public synchronized static void setResultMapper(@Nullable ObjectMapper mapper) { /** * Creates a new connection builder. * - * @return a newly created {@link Connection.Builder} + * @return a newly created {@link Connection.Builder}. */ public @NotNull Connection.Builder connection() { return new Connection.Builder(); @@ -59,7 +59,7 @@ public synchronized static void setResultMapper(@Nullable ObjectMapper mapper) { * Creates a new connection builder and configures it with a db-url. * * @param dburl the db-url to configure the builder. - * @return a newly created {@link Connection.Builder} + * @return a newly created {@link Connection.Builder}. */ public @NotNull Connection.Builder connection(@NotNull String dburl) { return connection(URI.create(dburl)); @@ -69,9 +69,20 @@ public synchronized static void setResultMapper(@Nullable ObjectMapper mapper) { * Creates a new connection builder and configures it with a db-url. * * @param uri the db-url to configure the builder. - * @return a newly created {@link Connection.Builder} + * @return a newly created {@link Connection.Builder}. */ public @NotNull Connection.Builder connection(@NotNull URI uri) { return new Connection.Builder(uri); } + + /** + * Copies a connection builder. + * + * @param b the original builder. + * @return a copy of the {@link Connection.Builder}. + */ + public @NotNull Connection.Builder connection(Connection.Builder b) { + return new Connection.Builder(b); + } + } diff --git a/src/main/java/com/rethinkdb/ast/Query.java b/src/main/java/com/rethinkdb/ast/Query.java index 0056e2cd..d3d5a9ed 100644 --- a/src/main/java/com/rethinkdb/ast/Query.java +++ b/src/main/java/com/rethinkdb/ast/Query.java @@ -1,10 +1,10 @@ package com.rethinkdb.ast; -import com.rethinkdb.RethinkDB; import com.rethinkdb.gen.exc.ReqlRuntimeError; import com.rethinkdb.gen.proto.QueryType; import com.rethinkdb.model.OptArgs; import com.rethinkdb.utils.Internals; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,21 +22,20 @@ public class Query { private static final Logger LOGGER = LoggerFactory.getLogger(Query.class); - public final QueryType type; + public final @NotNull QueryType type; public final long token; - public final OptArgs globalOptions; - public final @Nullable ReqlAst term; + public final @Nullable OptArgs globalOptions; - public Query(QueryType type, long token, @Nullable ReqlAst term, OptArgs globalOptions) { + public Query(@NotNull QueryType type, long token, @Nullable ReqlAst term, @Nullable OptArgs globalOptions) { this.type = type; this.token = token; this.term = term; this.globalOptions = globalOptions; } - public Query(QueryType type, long token) { - this(type, token, null, new OptArgs()); + public Query(@NotNull QueryType type, long token) { + this(type, token, null, null); } public ByteBuffer serialize() { @@ -47,8 +46,8 @@ public ByteBuffer serialize() { if (term != null) { list.add(term.build()); } - if (!globalOptions.isEmpty()) { - list.add(ReqlAst.buildOptarg(globalOptions)); + if (globalOptions != null && !globalOptions.isEmpty()) { + list.add(ReqlAst.buildToMap(globalOptions)); } String json = Internals.getInternalMapper().writeValueAsString(list); byte[] bytes = json.getBytes(StandardCharsets.UTF_8); diff --git a/src/main/java/com/rethinkdb/ast/ReqlAst.java b/src/main/java/com/rethinkdb/ast/ReqlAst.java index dece91ad..d7d58411 100644 --- a/src/main/java/com/rethinkdb/ast/ReqlAst.java +++ b/src/main/java/com/rethinkdb/ast/ReqlAst.java @@ -4,12 +4,16 @@ import com.rethinkdb.gen.ast.Binary; import com.rethinkdb.gen.ast.Datum; import com.rethinkdb.gen.exc.ReqlDriverError; +import com.rethinkdb.gen.proto.ResponseType; import com.rethinkdb.gen.proto.TermType; import com.rethinkdb.model.Arguments; +import com.rethinkdb.model.GroupedResult; import com.rethinkdb.model.OptArgs; import com.rethinkdb.net.Connection; import com.rethinkdb.net.Result; import com.rethinkdb.utils.Types; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import java.util.*; import java.util.Map.Entry; @@ -20,113 +24,1383 @@ * Base class for all ReQL queries. */ public class ReqlAst { - protected final TermType termType; - protected final Arguments args; - protected final OptArgs optargs; + protected final @NotNull TermType termType; + protected final @Nullable Arguments args; + protected final @Nullable OptArgs optargs; - protected ReqlAst(TermType termType, Arguments args, OptArgs optargs) { - if (termType == null) { - throw new ReqlDriverError("termType can't be null!"); - } + protected ReqlAst(@NotNull TermType termType, @Nullable Arguments args, @Nullable OptArgs optargs) { + //if (termType == null) { + // throw new ReqlDriverError("termType can't be null!"); + //} this.termType = termType; - this.args = args != null ? args : new Arguments(); - this.optargs = optargs != null ? optargs : new OptArgs(); + this.args = args; + this.optargs = optargs; + } + + protected Object build() { + // Create a JSON object from the AST + // set initial capacity to max size possible, avoids resizing + List list = new ArrayList<>(3); + list.add(termType.value); + list.add(args == null || args.isEmpty() ? Collections.emptyList() : args.stream().map(ReqlAst::build).collect(Collectors.toList())); + if (optargs != null && !optargs.isEmpty()) { + list.add(buildToMap(optargs)); + } + return list; + } + + /** + * Runs this query via connection {@code conn} with default options and returns the result. + * + * @param conn The connection to run this query + * @return The result of this query + */ + public Result run(Connection conn) { + return conn.run(this, new OptArgs(), null, null, null); + } + + /** + * Runs this query via connection {@code conn} with options {@code runOpts} and returns the result. + * + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @return The result of this query + */ + public Result run(Connection conn, OptArgs runOpts) { + return conn.run(this, runOpts, null, null, null); + } + + /** + * Runs this query via connection {@code conn} with the specified {@code fetchMode} + * and returns the result. + * + * @param conn The connection to run this query + * @param fetchMode The fetch mode to use in partial sequences + * @return The result of this query + */ + public Result run(Connection conn, Result.FetchMode fetchMode) { + return conn.run(this, new OptArgs(), fetchMode, null, null); + } + + /** + * Runs this query via connection {@code conn} with default options and returns the result, with the values + * converted to the type of {@code Class}. + * + * @param The result type + * @param conn The connection to run this query + * @param typeRef The type to convert to + * @return The result of this query + */ + public Result run(Connection conn, Class typeRef) { + return conn.run(this, new OptArgs(), null, null, Types.of(typeRef)); + } + + /** + * Runs this query via connection {@code conn} with default options and returns the result, with the values + * converted to the type of {@code TypeReference}. + * + * @param The result type + * @param conn The connection to run this query + * @param typeRef The type to convert to + * @return The result of this query (either a {@code P or a Cursor

} + */ + public Result run(Connection conn, TypeReference typeRef) { + return conn.run(this, new OptArgs(), null, null, typeRef); + } + + /** + * Runs this query via connection {@code conn} with options {@code runOpts}, the specified {@code fetchMode} and + * returns the result. + * + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @param fetchMode The fetch mode to use in partial sequences + * @return The result of this query + */ + public Result run(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode) { + return conn.run(this, runOpts, fetchMode, null, null); + } + + /** + * Runs this query via connection {@code conn} with options {@code runOpts} and returns the result, with the values + * converted to the type of {@code TypeReference}. + * + * @param The result type + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @param typeRef The type to convert to + * @return The result of this query + */ + public Result run(Connection conn, OptArgs runOpts, Class typeRef) { + return conn.run(this, runOpts, null, null, Types.of(typeRef)); + } + + /** + * Runs this query via connection {@code conn} with options {@code runOpts} and returns the result, with the values + * converted to the type of {@code TypeReference}. + * + * @param The result type + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @param typeRef The type to convert to + * @return The result of this query + */ + public Result run(Connection conn, OptArgs runOpts, TypeReference typeRef) { + return conn.run(this, runOpts, null, null, typeRef); + } + + /** + * Runs this query via connection {@code conn} with the specified {@code fetchMode} and returns the result, with + * the values converted to the type of {@code Class}. + * + * @param The type of result + * @param conn The connection to run this query + * @param fetchMode The fetch mode to use in partial sequences + * @param typeRef The type to convert to + * @return The result of this query + */ + public Result run(Connection conn, Result.FetchMode fetchMode, Class typeRef) { + return conn.run(this, new OptArgs(), fetchMode, null, Types.of(typeRef)); + } + + /** + * Runs this query via connection {@code conn} with the specified {@code fetchMode} and returns the result, with + * the values converted to the type of {@code TypeReference}. + * + * @param The type of result + * @param conn The connection to run this query + * @param fetchMode The fetch mode to use in partial sequences + * @param typeRef The type to convert to + * @return The result of this query + */ + public Result run(Connection conn, Result.FetchMode fetchMode, TypeReference typeRef) { + return conn.run(this, new OptArgs(), fetchMode, null, typeRef); + } + + /** + * Runs this query via connection {@code conn} with options {@code runOpts}, the specified {@code fetchMode} + * and returns the result, with the values converted to the type of {@code Class}. + * + * @param The type of result + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @param fetchMode The fetch mode to use in partial sequences + * @param typeRef The type to convert to + * @return The result of this query + */ + public Result run(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode, Class typeRef) { + return conn.run(this, runOpts, fetchMode, null, Types.of(typeRef)); + } + + /** + * Runs this query via connection {@code conn} with options {@code runOpts}, the specified {@code fetchMode} + * and returns the result, with the values converted to the type of {@code TypeReference}. + * + * @param The type of result + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @param fetchMode The fetch mode to use in partial sequences + * @param typeRef The type to convert to + * @return The result of this query + */ + public Result run(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode, TypeReference typeRef) { + return conn.run(this, runOpts, fetchMode, null, typeRef); + } + + /** + * Runs this query via connection {@code conn} with default options and returns the result asynchronously. + * + * @param conn The connection to run this query + * @return The result of this query + */ + public CompletableFuture> runAsync(Connection conn) { + return conn.runAsync(this, new OptArgs(), null, null, null); + } + + /** + * Runs this query via connection {@code conn} with options {@code runOpts} and returns the result asynchronously. + * + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @return The result of this query + */ + public CompletableFuture> runAsync(Connection conn, OptArgs runOpts) { + return conn.runAsync(this, runOpts, null, null, null); + } + + /** + * Runs this query via connection {@code conn} with the specified {@code fetchMode} + * and returns the result asynchronously. + * + * @param conn The connection to run this query + * @param fetchMode The fetch mode to use in partial sequences + * @return The result of this query + */ + public CompletableFuture> runAsync(Connection conn, Result.FetchMode fetchMode) { + return conn.runAsync(this, new OptArgs(), fetchMode, null, null); + } + + + /** + * Runs this query via connection {@code conn} with default options and returns the result asynchronously, with the + * values converted to the type of {@code Class}. + * + * @param The result type + * @param conn The connection to run this query + * @param typeRef The type to convert to + * @return The result of this query + */ + public CompletableFuture> runAsync(Connection conn, Class typeRef) { + return conn.runAsync(this, new OptArgs(), null, null, Types.of(typeRef)); + } + + /** + * Runs this query via connection {@code conn} with default options and returns the result asynchronously, with the + * values converted to the type of {@code TypeReference}. + * + * @param The result type + * @param conn The connection to run this query + * @param typeRef The type to convert to + * @return The result of this query (either a {@code P or a Cursor

} + */ + public CompletableFuture> runAsync(Connection conn, TypeReference typeRef) { + return conn.runAsync(this, new OptArgs(), null, null, typeRef); + } + + /** + * Runs this query via connection {@code conn} with options {@code runOpts}, the specified {@code fetchMode} and + * returns the result asynchronously. + * + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @param fetchMode The fetch mode to use in partial sequences + * @return The result of this query + */ + public CompletableFuture> runAsync(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode) { + return conn.runAsync(this, runOpts, fetchMode, null, null); + } + + /** + * Runs this query via connection {@code conn} with options {@code runOpts} and returns the result asynchronously, + * with the values converted to the type of {@code TypeReference}. + * + * @param The result type + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @param typeRef The type to convert to + * @return The result of this query + */ + public CompletableFuture> runAsync(Connection conn, OptArgs runOpts, Class typeRef) { + return conn.runAsync(this, runOpts, null, null, Types.of(typeRef)); + } + + /** + * Runs this query via connection {@code conn} with options {@code runOpts} and returns the result asynchronously, + * with the values converted to the type of {@code TypeReference}. + * + * @param The result type + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @param typeRef The type to convert to + * @return The result of this query + */ + public CompletableFuture> runAsync(Connection conn, OptArgs runOpts, TypeReference typeRef) { + return conn.runAsync(this, runOpts, null, null, typeRef); + } + + /** + * Runs this query via connection {@code conn} with the specified {@code fetchMode} and returns the result + * asynchronously, with the values converted to the type of {@code Class}. + * + * @param The type of result + * @param conn The connection to run this query + * @param fetchMode The fetch mode to use in partial sequences + * @param typeRef The type to convert to + * @return The result of this query + */ + public CompletableFuture> runAsync(Connection conn, Result.FetchMode fetchMode, Class typeRef) { + return conn.runAsync(this, new OptArgs(), fetchMode, null, Types.of(typeRef)); + } + + /** + * Runs this query via connection {@code conn} with the specified {@code fetchMode} and returns the result + * asynchronously, with the values converted to the type of {@code TypeReference}. + * + * @param The type of result + * @param conn The connection to run this query + * @param fetchMode The fetch mode to use in partial sequences + * @param typeRef The type to convert to + * @return The result of this query + */ + public CompletableFuture> runAsync(Connection conn, Result.FetchMode fetchMode, TypeReference typeRef) { + return conn.runAsync(this, new OptArgs(), fetchMode, null, typeRef); + } + + /** + * Runs this query via connection {@code conn} with options {@code runOpts}, the specified {@code fetchMode} + * and returns the result asynchronously, with the values converted to the type of {@code Class}. + * + * @param The type of result + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @param fetchMode The fetch mode to use in partial sequences + * @param typeRef The type to convert to + * @return The result of this query + */ + public CompletableFuture> runAsync(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode, Class typeRef) { + return conn.runAsync(this, runOpts, fetchMode, null, Types.of(typeRef)); + } + + /** + * Runs this query via connection {@code conn} with options {@code runOpts}, the specified {@code fetchMode} + * and returns the result asynchronously, with the values converted to the type of {@code TypeReference}. + * + * @param The type of result + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @param fetchMode The fetch mode to use in partial sequences + * @param typeRef The type to convert to + * @return The result of this query + */ + public CompletableFuture> runAsync(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode, TypeReference typeRef) { + return conn.runAsync(this, runOpts, fetchMode, null, typeRef); + } + + /** + * Runs this query via connection {@code conn} with default options and returns the unwrapped atom. + * + * @param conn The connection to run this query + * @return The result of this query + */ + public Object runAtom(Connection conn) { + return handleAtom(conn.run(this, new OptArgs(), null, false, null)); + } + + /** + * Runs this query via connection {@code conn} with options {@code runOpts} and returns the unwrapped atom. + * + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @return The result of this query + */ + public Object runAtom(Connection conn, OptArgs runOpts) { + return handleAtom(conn.run(this, runOpts, null, false, null)); + } + + /** + * Runs this query via connection {@code conn} with the specified {@code fetchMode} + * and returns the unwrapped atom. + * + * @param conn The connection to run this query + * @param fetchMode The fetch mode to use in partial sequences + * @return The result of this query + */ + public Object runAtom(Connection conn, Result.FetchMode fetchMode) { + return handleAtom(conn.run(this, new OptArgs(), fetchMode, false, null)); + } + + /** + * Runs this query via connection {@code conn} with default options and returns the unwrapped atom, with the values + * converted to the type of {@code Class}. + * + * @param The result type + * @param conn The connection to run this query + * @param typeRef The type to convert to + * @return The result of this query + */ + public T runAtom(Connection conn, Class typeRef) { + return handleAtom(conn.run(this, new OptArgs(), null, false, Types.of(typeRef))); + } + + /** + * Runs this query via connection {@code conn} with default options and returns the unwrapped atom, with the values + * converted to the type of {@code TypeReference}. + * + * @param The result type + * @param conn The connection to run this query + * @param typeRef The type to convert to + * @return The result of this query (either a {@code P or a Cursor

} + */ + public T runAtom(Connection conn, TypeReference typeRef) { + return handleAtom(conn.run(this, new OptArgs(), null, false, typeRef)); + } + + /** + * Runs this query via connection {@code conn} with options {@code runOpts}, the specified {@code fetchMode} and + * returns the unwrapped atom. + * + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @param fetchMode The fetch mode to use in partial sequences + * @return The result of this query + */ + public Object runAtom(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode) { + return handleAtom(conn.run(this, runOpts, fetchMode, false, null)); + } + + /** + * Runs this query via connection {@code conn} with options {@code runOpts} and returns the unwrapped atom, + * with the values converted to the type of {@code TypeReference}. + * + * @param The result type + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @param typeRef The type to convert to + * @return The result of this query + */ + public T runAtom(Connection conn, OptArgs runOpts, Class typeRef) { + return handleAtom(conn.run(this, runOpts, null, false, Types.of(typeRef))); + } + + /** + * Runs this query via connection {@code conn} with options {@code runOpts} and returns the unwrapped atom, + * with the values converted to the type of {@code TypeReference}. + * + * @param The result type + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @param typeRef The type to convert to + * @return The result of this query + */ + public T runAtom(Connection conn, OptArgs runOpts, TypeReference typeRef) { + return handleAtom(conn.run(this, runOpts, null, false, typeRef)); + } + + /** + * Runs this query via connection {@code conn} with the specified {@code fetchMode} and returns the unwrapped atom, + * with the values converted to the type of {@code Class}. + * + * @param The type of result + * @param conn The connection to run this query + * @param fetchMode The fetch mode to use in partial sequences + * @param typeRef The type to convert to + * @return The result of this query + */ + public T runAtom(Connection conn, Result.FetchMode fetchMode, Class typeRef) { + return handleAtom(conn.run(this, new OptArgs(), fetchMode, false, Types.of(typeRef))); + } + + /** + * Runs this query via connection {@code conn} with the specified {@code fetchMode} and returns the unwrapped atom, + * with the values converted to the type of {@code TypeReference}. + * + * @param The type of result + * @param conn The connection to run this query + * @param fetchMode The fetch mode to use in partial sequences + * @param typeRef The type to convert to + * @return The result of this query + */ + public T runAtom(Connection conn, Result.FetchMode fetchMode, TypeReference typeRef) { + return handleAtom(conn.run(this, new OptArgs(), fetchMode, false, typeRef)); + } + + /** + * Runs this query via connection {@code conn} with options {@code runOpts}, the specified {@code fetchMode} + * and returns the unwrapped atom, with the values converted to the type of {@code Class}. + * + * @param The type of result + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @param fetchMode The fetch mode to use in partial sequences + * @param typeRef The type to convert to + * @return The result of this query + */ + public T runAtom(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode, Class typeRef) { + return handleAtom(conn.run(this, runOpts, fetchMode, false, Types.of(typeRef))); + } + + /** + * Runs this query via connection {@code conn} with options {@code runOpts}, the specified {@code fetchMode} + * and returns the unwrapped atom, with the values converted to the type of {@code TypeReference}. + * + * @param The type of result + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @param fetchMode The fetch mode to use in partial sequences + * @param typeRef The type to convert to + * @return The result of this query + */ + public T runAtom(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode, TypeReference typeRef) { + return handleAtom(conn.run(this, runOpts, fetchMode, false, typeRef)); + } + + /** + * Runs this query via connection {@code conn} with default options and returns the unwrapped atom asynchronously. + * + * @param conn The connection to run this query + * @return The result of this query + */ + public CompletableFuture runAtomAsync(Connection conn) { + return conn.runAsync(this, new OptArgs(), null, false, null).thenApply(ReqlAst::handleAtom); + } + + /** + * Runs this query via connection {@code conn} with options {@code runOpts} and returns the unwrapped atom + * asynchronously. + * + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @return The result of this query + */ + public CompletableFuture runAtomAsync(Connection conn, OptArgs runOpts) { + return conn.runAsync(this, runOpts, null, false, null).thenApply(ReqlAst::handleAtom); + } + + /** + * Runs this query via connection {@code conn} with the specified {@code fetchMode} + * and returns the unwrapped atom asynchronously. + * + * @param conn The connection to run this query + * @param fetchMode The fetch mode to use in partial sequences + * @return The result of this query + */ + public CompletableFuture runAtomAsync(Connection conn, Result.FetchMode fetchMode) { + return conn.runAsync(this, new OptArgs(), fetchMode, false, null).thenApply(ReqlAst::handleAtom); + } + + + /** + * Runs this query via connection {@code conn} with default options and returns the unwrapped atom asynchronously, + * with the values converted to the type of {@code Class}. + * + * @param The result type + * @param conn The connection to run this query + * @param typeRef The type to convert to + * @return The result of this query + */ + public CompletableFuture runAtomAsync(Connection conn, Class typeRef) { + return conn.runAsync(this, new OptArgs(), null, false, Types.of(typeRef)).thenApply(ReqlAst::handleAtom); + } + + /** + * Runs this query via connection {@code conn} with default options and returns the unwrapped atom asynchronously, + * with the values converted to the type of {@code TypeReference}. + * + * @param The result type + * @param conn The connection to run this query + * @param typeRef The type to convert to + * @return The result of this query (either a {@code P or a Cursor

} + */ + public CompletableFuture runAtomAsync(Connection conn, TypeReference typeRef) { + return conn.runAsync(this, new OptArgs(), null, false, typeRef).thenApply(ReqlAst::handleAtom); + } + + /** + * Runs this query via connection {@code conn} with options {@code runOpts}, the specified {@code fetchMode} and + * returns the unwrapped atom asynchronously. + * + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @param fetchMode The fetch mode to use in partial sequences + * @return The result of this query + */ + public CompletableFuture runAtomAsync(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode) { + return conn.runAsync(this, runOpts, fetchMode, false, null).thenApply(ReqlAst::handleAtom); + } + + /** + * Runs this query via connection {@code conn} with options {@code runOpts} and returns the unwrapped atom + * asynchronously, with the values converted to the type of {@code TypeReference}. + * + * @param The result type + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @param typeRef The type to convert to + * @return The result of this query + */ + public CompletableFuture runAtomAsync(Connection conn, OptArgs runOpts, Class typeRef) { + return conn.runAsync(this, runOpts, null, false, Types.of(typeRef)).thenApply(ReqlAst::handleAtom); + } + + /** + * Runs this query via connection {@code conn} with options {@code runOpts} and returns the unwrapped atom + * asynchronously, with the values converted to the type of {@code TypeReference}. + * + * @param The result type + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @param typeRef The type to convert to + * @return The result of this query + */ + public CompletableFuture runAtomAsync(Connection conn, OptArgs runOpts, TypeReference typeRef) { + return conn.runAsync(this, runOpts, null, false, typeRef).thenApply(ReqlAst::handleAtom); + } + + /** + * Runs this query via connection {@code conn} with the specified {@code fetchMode} and returns the unwrapped atom + * asynchronously, with the values converted to the type of {@code Class}. + * + * @param The type of result + * @param conn The connection to run this query + * @param fetchMode The fetch mode to use in partial sequences + * @param typeRef The type to convert to + * @return The result of this query + */ + public CompletableFuture runAtomAsync(Connection conn, Result.FetchMode fetchMode, Class typeRef) { + return conn.runAsync(this, new OptArgs(), fetchMode, false, Types.of(typeRef)).thenApply(ReqlAst::handleAtom); + } + + /** + * Runs this query via connection {@code conn} with the specified {@code fetchMode} and returns the unwrapped atom + * asynchronously, with the values converted to the type of {@code TypeReference}. + * + * @param The type of result + * @param conn The connection to run this query + * @param fetchMode The fetch mode to use in partial sequences + * @param typeRef The type to convert to + * @return The result of this query + */ + public CompletableFuture runAtomAsync(Connection conn, Result.FetchMode fetchMode, TypeReference typeRef) { + return conn.runAsync(this, new OptArgs(), fetchMode, false, typeRef).thenApply(ReqlAst::handleAtom); + } + + /** + * Runs this query via connection {@code conn} with options {@code runOpts}, the specified {@code fetchMode} + * and returns the unwrapped atom asynchronously, with the values converted to the type of {@code Class}. + * + * @param The type of result + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @param fetchMode The fetch mode to use in partial sequences + * @param typeRef The type to convert to + * @return The result of this query + */ + public CompletableFuture runAtomAsync(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode, Class typeRef) { + return conn.runAsync(this, runOpts, fetchMode, false, Types.of(typeRef)).thenApply(ReqlAst::handleAtom); + } + + /** + * Runs this query via connection {@code conn} with options {@code runOpts}, the specified {@code fetchMode} + * and returns the unwrapped atom asynchronously, with the values converted to the type of {@code TypeReference}. + * + * @param The type of result + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @param fetchMode The fetch mode to use in partial sequences + * @param typeRef The type to convert to + * @return The result of this query + */ + public CompletableFuture runAtomAsync(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode, TypeReference typeRef) { + return conn.runAsync(this, runOpts, fetchMode, false, typeRef).thenApply(ReqlAst::handleAtom); + } + + /** + * Runs this query via connection {@code conn} with default options and returns the unwrapped grouping result, with + * the values converted to the defined grouping and value types. + * + * @param The grouping type + * @param The value type + * @param keyRef The grouping type to convert to + * @param valueRef The value type to convert to + * @param conn The connection to run this query + * @return The result of this query + */ + public Map> runGrouping(Connection conn, Class keyRef, Class valueRef) { + return handleGrouping(conn.run(this, new OptArgs(), null, true, Types.groupOf(keyRef, valueRef))); + } + + /** + * Runs this query via connection {@code conn} with default options and returns the unwrapped grouping result, with + * the values converted to the defined grouping and value types. + * + * @param The grouping type + * @param The value type + * @param keyRef The grouping type to convert to + * @param valueRef The value type to convert to + * @param conn The connection to run this query + * @return The result of this query + */ + public Map> runGrouping(Connection conn, TypeReference keyRef, Class valueRef) { + return handleGrouping(conn.run(this, new OptArgs(), null, true, Types.groupOf(keyRef, valueRef))); + } + + /** + * Runs this query via connection {@code conn} with default options and returns the unwrapped grouping result, with + * the values converted to the defined grouping and value types. + * + * @param The grouping type + * @param The value type + * @param keyRef The grouping type to convert to + * @param valueRef The value type to convert to + * @param conn The connection to run this query + * @return The result of this query (either a {@code P or a Cursor

} + */ + public Map> runGrouping(Connection conn, Class keyRef, TypeReference valueRef) { + return handleGrouping(conn.run(this, new OptArgs(), null, true, Types.groupOf(keyRef, valueRef))); + } + + /** + * Runs this query via connection {@code conn} with default options and returns the unwrapped grouping result, with + * the values converted to the defined grouping and value types. + * + * @param The grouping type + * @param The value type + * @param keyRef The grouping type to convert to + * @param valueRef The value type to convert to + * @param conn The connection to run this query + * @return The result of this query (either a {@code P or a Cursor

} + */ + public Map> runGrouping(Connection conn, TypeReference keyRef, TypeReference valueRef) { + return handleGrouping(conn.run(this, new OptArgs(), null, true, Types.groupOf(keyRef, valueRef))); + } + + /** + * Runs this query via connection {@code conn} with default options and returns the unwrapped grouping result, with + * the values converted to the defined grouping and value types. + * + * @param The grouping type + * @param The value type + * @param conn The connection to run this query + * @param typeRef The type to convert to + * @return The result of this query (either a {@code P or a Cursor

} + */ + public Map> runGrouping(Connection conn, TypeReference> typeRef) { + return handleGrouping(conn.run(this, new OptArgs(), null, true, typeRef)); + } + + /** + * Runs this query via connection {@code conn} with options {@code runOpts} and returns the unwrapped grouping result, + * with the values converted to the defined grouping and value types. + * + * @param The grouping type + * @param The value type + * @param keyRef The grouping type to convert to + * @param valueRef The value type to convert to + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @return The result of this query + */ + public Map> runGrouping(Connection conn, OptArgs runOpts, Class keyRef, Class valueRef) { + return handleGrouping(conn.run(this, runOpts, null, true, Types.groupOf(keyRef, valueRef))); + } + + /** + * Runs this query via connection {@code conn} with options {@code runOpts} and returns the unwrapped grouping result, + * with the values converted to the defined grouping and value types. + * + * @param The grouping type + * @param The value type + * @param keyRef The grouping type to convert to + * @param valueRef The value type to convert to + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @return The result of this query + */ + public Map> runGrouping(Connection conn, OptArgs runOpts, TypeReference keyRef, Class valueRef) { + return handleGrouping(conn.run(this, runOpts, null, true, Types.groupOf(keyRef, valueRef))); + } + + /** + * Runs this query via connection {@code conn} with options {@code runOpts} and returns the unwrapped grouping result, + * with the values converted to the defined grouping and value types. + * + * @param The grouping type + * @param The value type + * @param keyRef The grouping type to convert to + * @param valueRef The value type to convert to + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @return The result of this query + */ + public Map> runGrouping(Connection conn, OptArgs runOpts, Class keyRef, TypeReference valueRef) { + return handleGrouping(conn.run(this, runOpts, null, true, Types.groupOf(keyRef, valueRef))); + } + + /** + * Runs this query via connection {@code conn} with options {@code runOpts} and returns the unwrapped grouping result, + * with the values converted to the defined grouping and value types. + * + * @param The grouping type + * @param The value type + * @param keyRef The grouping type to convert to + * @param valueRef The value type to convert to + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @return The result of this query + */ + public Map> runGrouping(Connection conn, OptArgs runOpts, TypeReference keyRef, TypeReference valueRef) { + return handleGrouping(conn.run(this, runOpts, null, true, Types.groupOf(keyRef, valueRef))); + } + + /** + * Runs this query via connection {@code conn} with options {@code runOpts} and returns the unwrapped grouping result, + * with the values converted to the defined grouping and value types. + * + * @param The grouping type + * @param The value type + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @param typeRef The type to convert to + * @return The result of this query + */ + public Map> runGrouping(Connection conn, OptArgs runOpts, TypeReference> typeRef) { + return handleGrouping(conn.run(this, runOpts, null, true, typeRef)); + } + + /** + * Runs this query via connection {@code conn} with the specified {@code fetchMode} and returns the unwrapped grouping result, + * with the values converted to the defined grouping and value types. + * + * @param The grouping type + * @param The value type + * @param keyRef The grouping type to convert to + * @param valueRef The value type to convert to + * @param conn The connection to run this query + * @param fetchMode The fetch mode to use in partial sequences + * @return The result of this query + */ + public Map> runGrouping(Connection conn, Result.FetchMode fetchMode, Class keyRef, Class valueRef) { + return handleGrouping(conn.run(this, new OptArgs(), fetchMode, true, Types.groupOf(keyRef, valueRef))); + } + + /** + * Runs this query via connection {@code conn} with the specified {@code fetchMode} and returns the unwrapped grouping result, + * with the values converted to the defined grouping and value types. + * + * @param The grouping type + * @param The value type + * @param keyRef The grouping type to convert to + * @param valueRef The value type to convert to + * @param conn The connection to run this query + * @param fetchMode The fetch mode to use in partial sequences + * @return The result of this query + */ + public Map> runGrouping(Connection conn, Result.FetchMode fetchMode, TypeReference keyRef, Class valueRef) { + return handleGrouping(conn.run(this, new OptArgs(), fetchMode, true, Types.groupOf(keyRef, valueRef))); + } + + /** + * Runs this query via connection {@code conn} with the specified {@code fetchMode} and returns the unwrapped grouping result, + * with the values converted to the defined grouping and value types. + * + * @param The grouping type + * @param The value type + * @param keyRef The grouping type to convert to + * @param valueRef The value type to convert to + * @param conn The connection to run this query + * @param fetchMode The fetch mode to use in partial sequences + * @return The result of this query + */ + public Map> runGrouping(Connection conn, Result.FetchMode fetchMode, Class keyRef, TypeReference valueRef) { + return handleGrouping(conn.run(this, new OptArgs(), fetchMode, true, Types.groupOf(keyRef, valueRef))); + } + + /** + * Runs this query via connection {@code conn} with the specified {@code fetchMode} and returns the unwrapped grouping result, + * with the values converted to the defined grouping and value types. + * + * @param The grouping type + * @param The value type + * @param keyRef The grouping type to convert to + * @param valueRef The value type to convert to + * @param conn The connection to run this query + * @param fetchMode The fetch mode to use in partial sequences + * @return The result of this query + */ + public Map> runGrouping(Connection conn, Result.FetchMode fetchMode, TypeReference keyRef, TypeReference valueRef) { + return handleGrouping(conn.run(this, new OptArgs(), fetchMode, true, Types.groupOf(keyRef, valueRef))); + } + + /** + * Runs this query via connection {@code conn} with the specified {@code fetchMode} and returns the unwrapped grouping result, + * with the values converted to the defined grouping and value types. + * + * @param The grouping type + * @param The value type + * @param typeRef The type to convert to + * @param conn The connection to run this query + * @param fetchMode The fetch mode to use in partial sequences + * @return The result of this query + */ + public Map> runGrouping(Connection conn, Result.FetchMode fetchMode, TypeReference> typeRef) { + return handleGrouping(conn.run(this, new OptArgs(), fetchMode, true, typeRef)); } - public static Map buildOptarg(OptArgs opts) { - Map result = new LinkedHashMap<>(opts.size()); - opts.forEach((name, arg) -> result.put(name, arg.build())); - return result; + /** + * Runs this query via connection {@code conn} with options {@code runOpts}, the specified {@code fetchMode} + * and returns the unwrapped grouping result, with the values converted to the defined grouping and value types + * + * @param The grouping type + * @param The value type + * @param keyRef The grouping type to convert to + * @param valueRef The value type to convert to + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @param fetchMode The fetch mode to use in partial sequences + * @return The result of this query + */ + public Map> runGrouping(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode, Class keyRef, Class valueRef) { + return handleGrouping(conn.run(this, runOpts, fetchMode, true, Types.groupOf(keyRef, valueRef))); } - protected Object build() { - // Create a JSON object from the Ast - // set initial capacity to max size possible, avoids resizing - List list = new ArrayList<>(3); - list.add(termType.value); - list.add(args.isEmpty() ? Collections.emptyList() : args.stream().map(ReqlAst::build).collect(Collectors.toList())); - if (optargs.size() > 0) { - list.add(buildOptarg(optargs)); - } - return list; + /** + * Runs this query via connection {@code conn} with options {@code runOpts}, the specified {@code fetchMode} + * and returns the unwrapped grouping result, with the values converted to the defined grouping and value types + * + * @param The grouping type + * @param The value type + * @param keyRef The grouping type to convert to + * @param valueRef The value type to convert to + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @param fetchMode The fetch mode to use in partial sequences + * @return The result of this query + */ + public Map> runGrouping(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode, TypeReference keyRef, Class valueRef) { + return handleGrouping(conn.run(this, runOpts, fetchMode, true, Types.groupOf(keyRef, valueRef))); + } + + /** + * Runs this query via connection {@code conn} with options {@code runOpts}, the specified {@code fetchMode} + * and returns the unwrapped grouping result, with the values converted to the defined grouping and value types + * + * @param The grouping type + * @param The value type + * @param keyRef The grouping type to convert to + * @param valueRef The value type to convert to + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @param fetchMode The fetch mode to use in partial sequences + * @return The result of this query + */ + public Map> runGrouping(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode, Class keyRef, TypeReference valueRef) { + return handleGrouping(conn.run(this, runOpts, fetchMode, true, Types.groupOf(keyRef, valueRef))); + } + + /** + * Runs this query via connection {@code conn} with options {@code runOpts}, the specified {@code fetchMode} + * and returns the unwrapped grouping result, with the values converted to the defined grouping and value types + * + * @param The grouping type + * @param The value type + * @param keyRef The grouping type to convert to + * @param valueRef The value type to convert to + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @param fetchMode The fetch mode to use in partial sequences + * @return The result of this query + */ + public Map> runGrouping(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode, TypeReference keyRef, TypeReference valueRef) { + return handleGrouping(conn.run(this, runOpts, fetchMode, true, Types.groupOf(keyRef, valueRef))); + } + + /** + * Runs this query via connection {@code conn} with options {@code runOpts}, the specified {@code fetchMode} + * and returns the unwrapped grouping result, with the values converted to the defined grouping and value types + * + * @param The grouping type + * @param The value type + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @param fetchMode The fetch mode to use in partial sequences + * @param typeRef The type to convert to + * @return The result of this query + */ + public Map> runGrouping(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode, TypeReference> typeRef) { + return handleGrouping(conn.run(this, runOpts, fetchMode, true, typeRef)); + } + + /** + * Runs this query via connection {@code conn} with default options and returns the unwrapped grouping result + * asynchronously, with the values converted to the defined grouping and value types. + * + * @param The grouping type + * @param The value type + * @param conn The connection to run this query + * @param keyRef The grouping type to convert to + * @param valueRef The value type to convert to + * @return The result of this query + */ + public CompletableFuture>> runGroupingAsync(Connection conn, Class keyRef, Class valueRef) { + return conn.runAsync(this, new OptArgs(), null, true, Types.groupOf(keyRef, valueRef)).thenApply(ReqlAst::handleGrouping); + } + + /** + * Runs this query via connection {@code conn} with default options and returns the unwrapped grouping result + * asynchronously, with the values converted to the defined grouping and value types. + * + * @param The grouping type + * @param The value type + * @param conn The connection to run this query + * @param keyRef The grouping type to convert to + * @param valueRef The value type to convert to + * @return The result of this query + */ + public CompletableFuture>> runGroupingAsync(Connection conn, TypeReference keyRef, Class valueRef) { + return conn.runAsync(this, new OptArgs(), null, true, Types.groupOf(keyRef, valueRef)).thenApply(ReqlAst::handleGrouping); + } + + /** + * Runs this query via connection {@code conn} with default options and returns the unwrapped grouping result + * asynchronously, with the values converted to the defined grouping and value types. + * + * @param The grouping type + * @param The value type + * @param conn The connection to run this query + * @param keyRef The grouping type to convert to + * @param valueRef The value type to convert to + * @return The result of this query (either a {@code P or a Cursor

} + */ + public CompletableFuture>> runGroupingAsync(Connection conn, Class keyRef, TypeReference valueRef) { + return conn.runAsync(this, new OptArgs(), null, true, Types.groupOf(keyRef, valueRef)).thenApply(ReqlAst::handleGrouping); + } + + /** + * Runs this query via connection {@code conn} with default options and returns the unwrapped grouping result + * asynchronously, with the values converted to the defined grouping and value types. + * + * @param The grouping type + * @param The value type + * @param conn The connection to run this query + * @param keyRef The grouping type to convert to + * @param valueRef The value type to convert to + * @return The result of this query (either a {@code P or a Cursor

} + */ + public CompletableFuture>> runGroupingAsync(Connection conn, TypeReference keyRef, TypeReference valueRef) { + return conn.runAsync(this, new OptArgs(), null, true, Types.groupOf(keyRef, valueRef)).thenApply(ReqlAst::handleGrouping); + } + + /** + * Runs this query via connection {@code conn} with default options and returns the unwrapped grouping result + * asynchronously, with the values converted to the defined grouping and value types. + * + * @param The grouping type + * @param The value type + * @param conn The connection to run this query + * @param typeRef The type to convert to + * @return The result of this query (either a {@code P or a Cursor

} + */ + public CompletableFuture>> runGroupingAsync(Connection conn, TypeReference> typeRef) { + return conn.runAsync(this, new OptArgs(), null, true, typeRef).thenApply(ReqlAst::handleGrouping); + } + + /** + * Runs this query via connection {@code conn} with options {@code runOpts} and returns the unwrapped grouping result + * asynchronously, with the values converted to the defined grouping and value types. + * + * @param The grouping type + * @param The value type + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @param keyRef The grouping type to convert to + * @param valueRef The value type to convert to + * @return The result of this query + */ + public CompletableFuture>> runGroupingAsync(Connection conn, OptArgs runOpts, Class keyRef, Class valueRef) { + return conn.runAsync(this, runOpts, null, true, Types.groupOf(keyRef, valueRef)).thenApply(ReqlAst::handleGrouping); + } + + /** + * Runs this query via connection {@code conn} with options {@code runOpts} and returns the unwrapped grouping result + * asynchronously, with the values converted to the defined grouping and value types. + * + * @param The grouping type + * @param The value type + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @param keyRef The grouping type to convert to + * @param valueRef The value type to convert to + * @return The result of this query + */ + public CompletableFuture>> runGroupingAsync(Connection conn, OptArgs runOpts, TypeReference keyRef, Class valueRef) { + return conn.runAsync(this, runOpts, null, true, Types.groupOf(keyRef, valueRef)).thenApply(ReqlAst::handleGrouping); + } + + /** + * Runs this query via connection {@code conn} with options {@code runOpts} and returns the unwrapped grouping result + * asynchronously, with the values converted to the defined grouping and value types. + * + * @param The grouping type + * @param The value type + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @param keyRef The grouping type to convert to + * @param valueRef The value type to convert to + * @return The result of this query + */ + public CompletableFuture>> runGroupingAsync(Connection conn, OptArgs runOpts, Class keyRef, TypeReference valueRef) { + return conn.runAsync(this, runOpts, null, true, Types.groupOf(keyRef, valueRef)).thenApply(ReqlAst::handleGrouping); + } + + /** + * Runs this query via connection {@code conn} with options {@code runOpts} and returns the unwrapped grouping result + * asynchronously, with the values converted to the defined grouping and value types. + * + * @param The grouping type + * @param The value type + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @param keyRef The grouping type to convert to + * @param valueRef The value type to convert to + * @return The result of this query + */ + public CompletableFuture>> runGroupingAsync(Connection conn, OptArgs runOpts, TypeReference keyRef, TypeReference valueRef) { + return conn.runAsync(this, runOpts, null, true, Types.groupOf(keyRef, valueRef)).thenApply(ReqlAst::handleGrouping); + } + + /** + * Runs this query via connection {@code conn} with options {@code runOpts} and returns the unwrapped grouping result + * asynchronously, with the values converted to the defined grouping and value types. + * + * @param The grouping type + * @param The value type + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @param typeRef The type to convert to + * @return The result of this query + */ + public CompletableFuture>> runGroupingAsync(Connection conn, OptArgs runOpts, TypeReference> typeRef) { + return conn.runAsync(this, runOpts, null, true, typeRef).thenApply(ReqlAst::handleGrouping); + } + + /** + * Runs this query via connection {@code conn} with the specified {@code fetchMode} and returns the unwrapped grouping result + * asynchronously, with the values converted to the defined grouping and value types. + * + * @param The grouping type + * @param The value type + * @param conn The connection to run this query + * @param fetchMode The fetch mode to use in partial sequences + * @param keyRef The grouping type to convert to + * @param valueRef The value type to convert to + * @return The result of this query + */ + public CompletableFuture>> runGroupingAsync(Connection conn, Result.FetchMode fetchMode, Class keyRef, Class valueRef) { + return conn.runAsync(this, new OptArgs(), fetchMode, true, Types.groupOf(keyRef, valueRef)).thenApply(ReqlAst::handleGrouping); + } + + /** + * Runs this query via connection {@code conn} with the specified {@code fetchMode} and returns the unwrapped grouping result + * asynchronously, with the values converted to the defined grouping and value types. + * + * @param The grouping type + * @param The value type + * @param conn The connection to run this query + * @param fetchMode The fetch mode to use in partial sequences + * @param keyRef The grouping type to convert to + * @param valueRef The value type to convert to + * @return The result of this query + */ + public CompletableFuture>> runGroupingAsync(Connection conn, Result.FetchMode fetchMode, TypeReference keyRef, Class valueRef) { + return conn.runAsync(this, new OptArgs(), fetchMode, true, Types.groupOf(keyRef, valueRef)).thenApply(ReqlAst::handleGrouping); + } + + /** + * Runs this query via connection {@code conn} with the specified {@code fetchMode} and returns the unwrapped grouping result + * asynchronously, with the values converted to the defined grouping and value types. + * + * @param The grouping type + * @param The value type + * @param conn The connection to run this query + * @param fetchMode The fetch mode to use in partial sequences + * @param keyRef The grouping type to convert to + * @param valueRef The value type to convert to + * @return The result of this query + */ + public CompletableFuture>> runGroupingAsync(Connection conn, Result.FetchMode fetchMode, Class keyRef, TypeReference valueRef) { + return conn.runAsync(this, new OptArgs(), fetchMode, true, Types.groupOf(keyRef, valueRef)).thenApply(ReqlAst::handleGrouping); + } + + /** + * Runs this query via connection {@code conn} with the specified {@code fetchMode} and returns the unwrapped grouping result + * asynchronously, with the values converted to the defined grouping and value types. + * + * @param The grouping type + * @param The value type + * @param conn The connection to run this query + * @param fetchMode The fetch mode to use in partial sequences + * @param keyRef The grouping type to convert to + * @param valueRef The value type to convert to + * @return The result of this query + */ + public CompletableFuture>> runGroupingAsync(Connection conn, Result.FetchMode fetchMode, TypeReference keyRef, TypeReference valueRef) { + return conn.runAsync(this, new OptArgs(), fetchMode, true, Types.groupOf(keyRef, valueRef)).thenApply(ReqlAst::handleGrouping); + } + + /** + * Runs this query via connection {@code conn} with the specified {@code fetchMode} and returns the unwrapped grouping result + * asynchronously, with the values converted to the defined grouping and value types. + * + * @param The grouping type + * @param The value type + * @param conn The connection to run this query + * @param fetchMode The fetch mode to use in partial sequences + * @param typeRef The type to convert to + * @return The result of this query + */ + public CompletableFuture>> runGroupingAsync(Connection conn, Result.FetchMode fetchMode, TypeReference> typeRef) { + return conn.runAsync(this, new OptArgs(), fetchMode, true, typeRef).thenApply(ReqlAst::handleGrouping); + } + + /** + * Runs this query via connection {@code conn} with options {@code runOpts}, the specified {@code fetchMode} + * and returns the unwrapped grouping result asynchronously, with the values converted to the defined grouping and value types + * + * @param The grouping type + * @param The value type + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @param fetchMode The fetch mode to use in partial sequences + * @param keyRef The grouping type to convert to + * @param valueRef The value type to convert to + * @return The result of this query + */ + public CompletableFuture>> runGroupingAsync(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode, Class keyRef, Class valueRef) { + return conn.runAsync(this, runOpts, fetchMode, true, Types.groupOf(keyRef, valueRef)).thenApply(ReqlAst::handleGrouping); + } + + /** + * Runs this query via connection {@code conn} with options {@code runOpts}, the specified {@code fetchMode} + * and returns the unwrapped grouping result asynchronously, with the values converted to the defined grouping and value types + * + * @param The grouping type + * @param The value type + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @param fetchMode The fetch mode to use in partial sequences + * @param keyRef The grouping type to convert to + * @param valueRef The value type to convert to + * @return The result of this query + */ + public CompletableFuture>> runGroupingAsync(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode, TypeReference keyRef, Class valueRef) { + return conn.runAsync(this, runOpts, fetchMode, true, Types.groupOf(keyRef, valueRef)).thenApply(ReqlAst::handleGrouping); + } + + /** + * Runs this query via connection {@code conn} with options {@code runOpts}, the specified {@code fetchMode} + * and returns the unwrapped grouping result asynchronously, with the values converted to the defined grouping and value types + * + * @param The grouping type + * @param The value type + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @param fetchMode The fetch mode to use in partial sequences + * @param keyRef The grouping type to convert to + * @param valueRef The value type to convert to + * @return The result of this query + */ + public CompletableFuture>> runGroupingAsync(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode, Class keyRef, TypeReference valueRef) { + return conn.runAsync(this, runOpts, fetchMode, true, Types.groupOf(keyRef, valueRef)).thenApply(ReqlAst::handleGrouping); + } + + /** + * Runs this query via connection {@code conn} with options {@code runOpts}, the specified {@code fetchMode} + * and returns the unwrapped grouping result asynchronously, with the values converted to the defined grouping and value types + * + * @param The grouping type + * @param The value type + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @param fetchMode The fetch mode to use in partial sequences + * @param keyRef The grouping type to convert to + * @param valueRef The value type to convert to + * @return The result of this query + */ + public CompletableFuture>> runGroupingAsync(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode, TypeReference keyRef, TypeReference valueRef) { + return conn.runAsync(this, runOpts, fetchMode, true, Types.groupOf(keyRef, valueRef)).thenApply(ReqlAst::handleGrouping); + } + + /** + * Runs this query via connection {@code conn} with options {@code runOpts}, the specified {@code fetchMode} + * and returns the unwrapped grouping result asynchronously, with the values converted to the defined grouping and value types + * + * @param The grouping type + * @param The value type + * @param conn The connection to run this query + * @param runOpts The options to run this query with + * @param fetchMode The fetch mode to use in partial sequences + * @param typeRef The type to convert to + * @return The result of this query + */ + public CompletableFuture>> runGroupingAsync(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode, TypeReference> typeRef) { + return conn.runAsync(this, runOpts, fetchMode, true, typeRef).thenApply(ReqlAst::handleGrouping); } /** * Runs this query via connection {@code conn} with default options and returns the result. + * If the query returns an atom which is an array, it'll unwrap the atom as if it were a completed sequence. * * @param conn The connection to run this query * @return The result of this query */ - public Result run(Connection conn) { - return conn.run(this, new OptArgs(), null, null); + public Result runUnwrapping(Connection conn) { + return conn.run(this, new OptArgs(), null, true, null); } /** * Runs this query via connection {@code conn} with options {@code runOpts} and returns the result. + * If the query returns an atom which is an array, it'll unwrap the atom as if it were a completed sequence. * * @param conn The connection to run this query * @param runOpts The options to run this query with * @return The result of this query */ - public Result run(Connection conn, OptArgs runOpts) { - return conn.run(this, runOpts, null, null); + public Result runUnwrapping(Connection conn, OptArgs runOpts) { + return conn.run(this, runOpts, null, true, null); } /** * Runs this query via connection {@code conn} with the specified {@code fetchMode} * and returns the result. + * If the query returns an atom which is an array, it'll unwrap the atom as if it were a completed sequence. * * @param conn The connection to run this query * @param fetchMode The fetch mode to use in partial sequences * @return The result of this query */ - public Result run(Connection conn, Result.FetchMode fetchMode) { - return conn.run(this, new OptArgs(), fetchMode, null); + public Result runUnwrapping(Connection conn, Result.FetchMode fetchMode) { + return conn.run(this, new OptArgs(), fetchMode, true, null); } - /** * Runs this query via connection {@code conn} with default options and returns the result, with the values * converted to the type of {@code Class}. + * If the query returns an atom which is an array, it'll unwrap the atom as if it were a completed sequence, + * and the type conversion will be applied to each element of the array instead on the array as a whole. * * @param The result type * @param conn The connection to run this query * @param typeRef The type to convert to * @return The result of this query */ - public Result run(Connection conn, Class typeRef) { - return conn.run(this, new OptArgs(), null, Types.of(typeRef)); + public Result runUnwrapping(Connection conn, Class typeRef) { + return conn.run(this, new OptArgs(), null, true, Types.of(typeRef)); } /** * Runs this query via connection {@code conn} with default options and returns the result, with the values * converted to the type of {@code TypeReference}. + * If the query returns an atom which is an array, it'll unwrap the atom as if it were a completed sequence, + * and the type conversion will be applied to each element of the array instead on the array as a whole. * * @param The result type * @param conn The connection to run this query * @param typeRef The type to convert to * @return The result of this query (either a {@code P or a Cursor

} */ - public Result run(Connection conn, TypeReference typeRef) { - return conn.run(this, new OptArgs(), null, typeRef); + public Result runUnwrapping(Connection conn, TypeReference typeRef) { + return conn.run(this, new OptArgs(), null, true, typeRef); } /** * Runs this query via connection {@code conn} with options {@code runOpts}, the specified {@code fetchMode} and * returns the result. + * If the query returns an atom which is an array, it'll unwrap the atom as if it were a completed sequence. * * @param conn The connection to run this query * @param runOpts The options to run this query with * @param fetchMode The fetch mode to use in partial sequences * @return The result of this query */ - public Result run(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode) { - return conn.run(this, runOpts, fetchMode, null); + public Result runUnwrapping(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode) { + return conn.run(this, runOpts, fetchMode, true, null); } /** * Runs this query via connection {@code conn} with options {@code runOpts} and returns the result, with the values * converted to the type of {@code TypeReference}. + * If the query returns an atom which is an array, it'll unwrap the atom as if it were a completed sequence, + * and the type conversion will be applied to each element of the array instead on the array as a whole. * * @param The result type * @param conn The connection to run this query @@ -134,13 +1408,15 @@ public Result run(Connection conn, OptArgs runOpts, Result.FetchMode fet * @param typeRef The type to convert to * @return The result of this query */ - public Result run(Connection conn, OptArgs runOpts, Class typeRef) { - return conn.run(this, runOpts, null, Types.of(typeRef)); + public Result runUnwrapping(Connection conn, OptArgs runOpts, Class typeRef) { + return conn.run(this, runOpts, null, true, Types.of(typeRef)); } /** * Runs this query via connection {@code conn} with options {@code runOpts} and returns the result, with the values * converted to the type of {@code TypeReference}. + * If the query returns an atom which is an array, it'll unwrap the atom as if it were a completed sequence, + * and the type conversion will be applied to each element of the array instead on the array as a whole. * * @param The result type * @param conn The connection to run this query @@ -148,13 +1424,15 @@ public Result run(Connection conn, OptArgs runOpts, Class typeRef) { * @param typeRef The type to convert to * @return The result of this query */ - public Result run(Connection conn, OptArgs runOpts, TypeReference typeRef) { - return conn.run(this, runOpts, null, typeRef); + public Result runUnwrapping(Connection conn, OptArgs runOpts, TypeReference typeRef) { + return conn.run(this, runOpts, null, true, typeRef); } /** * Runs this query via connection {@code conn} with the specified {@code fetchMode} and returns the result, with * the values converted to the type of {@code Class}. + * If the query returns an atom which is an array, it'll unwrap the atom as if it were a completed sequence, + * and the type conversion will be applied to each element of the array instead on the array as a whole. * * @param The type of result * @param conn The connection to run this query @@ -162,13 +1440,15 @@ public Result run(Connection conn, OptArgs runOpts, TypeReference type * @param typeRef The type to convert to * @return The result of this query */ - public Result run(Connection conn, Result.FetchMode fetchMode, Class typeRef) { - return conn.run(this, new OptArgs(), fetchMode, Types.of(typeRef)); + public Result runUnwrapping(Connection conn, Result.FetchMode fetchMode, Class typeRef) { + return conn.run(this, new OptArgs(), fetchMode, true, Types.of(typeRef)); } /** * Runs this query via connection {@code conn} with the specified {@code fetchMode} and returns the result, with * the values converted to the type of {@code TypeReference}. + * If the query returns an atom which is an array, it'll unwrap the atom as if it were a completed sequence, + * and the type conversion will be applied to each element of the array instead on the array as a whole. * * @param The type of result * @param conn The connection to run this query @@ -176,13 +1456,15 @@ public Result run(Connection conn, Result.FetchMode fetchMode, Class t * @param typeRef The type to convert to * @return The result of this query */ - public Result run(Connection conn, Result.FetchMode fetchMode, TypeReference typeRef) { - return conn.run(this, new OptArgs(), fetchMode, typeRef); + public Result runUnwrapping(Connection conn, Result.FetchMode fetchMode, TypeReference typeRef) { + return conn.run(this, new OptArgs(), fetchMode, true, typeRef); } /** * Runs this query via connection {@code conn} with options {@code runOpts}, the specified {@code fetchMode} - * and returns the result, with the values converted to the type of {@code Class} + * and returns the result, with the values converted to the type of {@code Class}. + * If the query returns an atom which is an array, it'll unwrap the atom as if it were a completed sequence, + * and the type conversion will be applied to each element of the array instead on the array as a whole. * * @param The type of result * @param conn The connection to run this query @@ -191,13 +1473,15 @@ public Result run(Connection conn, Result.FetchMode fetchMode, TypeRefere * @param typeRef The type to convert to * @return The result of this query */ - public Result run(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode, Class typeRef) { - return conn.run(this, runOpts, fetchMode, Types.of(typeRef)); + public Result runUnwrapping(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode, Class typeRef) { + return conn.run(this, runOpts, fetchMode, true, Types.of(typeRef)); } /** * Runs this query via connection {@code conn} with options {@code runOpts}, the specified {@code fetchMode} - * and returns the result, with the values converted to the type of {@code TypeReference} + * and returns the result, with the values converted to the type of {@code TypeReference}. + * If the query returns an atom which is an array, it'll unwrap the atom as if it were a completed sequence, + * and the type conversion will be applied to each element of the array instead on the array as a whole. * * @param The type of result * @param conn The connection to run this query @@ -206,8 +1490,8 @@ public Result run(Connection conn, OptArgs runOpts, Result.FetchMode fetc * @param typeRef The type to convert to * @return The result of this query */ - public Result run(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode, TypeReference typeRef) { - return conn.run(this, runOpts, fetchMode, typeRef); + public Result runUnwrapping(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode, TypeReference typeRef) { + return conn.run(this, runOpts, fetchMode, true, typeRef); } /** @@ -216,76 +1500,85 @@ public Result run(Connection conn, OptArgs runOpts, Result.FetchMode fetc * @param conn The connection to run this query * @return The result of this query */ - public CompletableFuture> runAsync(Connection conn) { - return conn.runAsync(this, new OptArgs(), null, null); + public CompletableFuture> runUnwrappingAsync(Connection conn) { + return conn.runAsync(this, new OptArgs(), null, true, null); } /** * Runs this query via connection {@code conn} with options {@code runOpts} and returns the result asynchronously. + * If the query returns an atom which is an array, it'll unwrap the atom as if it were a completed sequence. * * @param conn The connection to run this query * @param runOpts The options to run this query with * @return The result of this query */ - public CompletableFuture> runAsync(Connection conn, OptArgs runOpts) { - return conn.runAsync(this, runOpts, null, null); + public CompletableFuture> runUnwrappingAsync(Connection conn, OptArgs runOpts) { + return conn.runAsync(this, runOpts, null, true, null); } /** * Runs this query via connection {@code conn} with the specified {@code fetchMode} * and returns the result asynchronously. + * If the query returns an atom which is an array, it'll unwrap the atom as if it were a completed sequence. * * @param conn The connection to run this query * @param fetchMode The fetch mode to use in partial sequences * @return The result of this query */ - public CompletableFuture> runAsync(Connection conn, Result.FetchMode fetchMode) { - return conn.runAsync(this, new OptArgs(), fetchMode, null); + public CompletableFuture> runUnwrappingAsync(Connection conn, Result.FetchMode fetchMode) { + return conn.runAsync(this, new OptArgs(), fetchMode, true, null); } /** * Runs this query via connection {@code conn} with default options and returns the result asynchronously, with the * values converted to the type of {@code Class}. + * If the query returns an atom which is an array, it'll unwrap the atom as if it were a completed sequence, + * and the type conversion will be applied to each element of the array instead on the array as a whole. * * @param The result type * @param conn The connection to run this query * @param typeRef The type to convert to * @return The result of this query */ - public CompletableFuture> runAsync(Connection conn, Class typeRef) { - return conn.runAsync(this, new OptArgs(), null, Types.of(typeRef)); + public CompletableFuture> runUnwrappingAsync(Connection conn, Class typeRef) { + return conn.runAsync(this, new OptArgs(), null, true, Types.of(typeRef)); } /** * Runs this query via connection {@code conn} with default options and returns the result asynchronously, with the * values converted to the type of {@code TypeReference}. + * If the query returns an atom which is an array, it'll unwrap the atom as if it were a completed sequence, + * and the type conversion will be applied to each element of the array instead on the array as a whole. * * @param The result type * @param conn The connection to run this query * @param typeRef The type to convert to * @return The result of this query (either a {@code P or a Cursor

} */ - public CompletableFuture> runAsync(Connection conn, TypeReference typeRef) { - return conn.runAsync(this, new OptArgs(), null, typeRef); + public CompletableFuture> runUnwrappingAsync(Connection conn, TypeReference typeRef) { + return conn.runAsync(this, new OptArgs(), null, true, typeRef); } /** * Runs this query via connection {@code conn} with options {@code runOpts}, the specified {@code fetchMode} and * returns the result asynchronously. + * If the query returns an atom which is an array, it'll unwrap the atom as if it were a completed sequence. * * @param conn The connection to run this query * @param runOpts The options to run this query with * @param fetchMode The fetch mode to use in partial sequences * @return The result of this query */ - public CompletableFuture> runAsync(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode) { - return conn.runAsync(this, runOpts, fetchMode, null); + public CompletableFuture> runUnwrappingAsync(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode) { + return conn.runAsync(this, runOpts, fetchMode, true, null); } /** * Runs this query via connection {@code conn} with options {@code runOpts} and returns the result asynchronously, * with the values converted to the type of {@code TypeReference}. + * If the query returns an atom which is an array, it'll unwrap the atom as if it were a completed sequence, + * and the type conversion will be applied to each element of the array instead on the array as a whole. * * @param The result type * @param conn The connection to run this query @@ -293,13 +1586,15 @@ public CompletableFuture> runAsync(Connection conn, OptArgs runOp * @param typeRef The type to convert to * @return The result of this query */ - public CompletableFuture> runAsync(Connection conn, OptArgs runOpts, Class typeRef) { - return conn.runAsync(this, runOpts, null, Types.of(typeRef)); + public CompletableFuture> runUnwrappingAsync(Connection conn, OptArgs runOpts, Class typeRef) { + return conn.runAsync(this, runOpts, null, true, Types.of(typeRef)); } /** * Runs this query via connection {@code conn} with options {@code runOpts} and returns the result asynchronously, * with the values converted to the type of {@code TypeReference}. + * If the query returns an atom which is an array, it'll unwrap the atom as if it were a completed sequence, + * and the type conversion will be applied to each element of the array instead on the array as a whole. * * @param The result type * @param conn The connection to run this query @@ -307,13 +1602,15 @@ public CompletableFuture> runAsync(Connection conn, OptArgs runOpt * @param typeRef The type to convert to * @return The result of this query */ - public CompletableFuture> runAsync(Connection conn, OptArgs runOpts, TypeReference typeRef) { - return conn.runAsync(this, runOpts, null, typeRef); + public CompletableFuture> runUnwrappingAsync(Connection conn, OptArgs runOpts, TypeReference typeRef) { + return conn.runAsync(this, runOpts, null, true, typeRef); } /** * Runs this query via connection {@code conn} with the specified {@code fetchMode} and returns the result * asynchronously, with the values converted to the type of {@code Class}. + * If the query returns an atom which is an array, it'll unwrap the atom as if it were a completed sequence, + * and the type conversion will be applied to each element of the array instead on the array as a whole. * * @param The type of result * @param conn The connection to run this query @@ -321,13 +1618,15 @@ public CompletableFuture> runAsync(Connection conn, OptArgs runOpt * @param typeRef The type to convert to * @return The result of this query */ - public CompletableFuture> runAsync(Connection conn, Result.FetchMode fetchMode, Class typeRef) { - return conn.runAsync(this, new OptArgs(), fetchMode, Types.of(typeRef)); + public CompletableFuture> runUnwrappingAsync(Connection conn, Result.FetchMode fetchMode, Class typeRef) { + return conn.runAsync(this, new OptArgs(), fetchMode, true, Types.of(typeRef)); } /** * Runs this query via connection {@code conn} with the specified {@code fetchMode} and returns the result * asynchronously, with the values converted to the type of {@code TypeReference}. + * If the query returns an atom which is an array, it'll unwrap the atom as if it were a completed sequence, + * and the type conversion will be applied to each element of the array instead on the array as a whole. * * @param The type of result * @param conn The connection to run this query @@ -335,13 +1634,15 @@ public CompletableFuture> runAsync(Connection conn, Result.FetchMo * @param typeRef The type to convert to * @return The result of this query */ - public CompletableFuture> runAsync(Connection conn, Result.FetchMode fetchMode, TypeReference typeRef) { - return conn.runAsync(this, new OptArgs(), fetchMode, typeRef); + public CompletableFuture> runUnwrappingAsync(Connection conn, Result.FetchMode fetchMode, TypeReference typeRef) { + return conn.runAsync(this, new OptArgs(), fetchMode, true, typeRef); } /** * Runs this query via connection {@code conn} with options {@code runOpts}, the specified {@code fetchMode} - * and returns the result asynchronously, with the values converted to the type of {@code Class} + * and returns the result asynchronously, with the values converted to the type of {@code Class}. + * If the query returns an atom which is an array, it'll unwrap the atom as if it were a completed sequence, + * and the type conversion will be applied to each element of the array instead on the array as a whole. * * @param The type of result * @param conn The connection to run this query @@ -350,13 +1651,15 @@ public CompletableFuture> runAsync(Connection conn, Result.FetchMo * @param typeRef The type to convert to * @return The result of this query */ - public CompletableFuture> runAsync(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode, Class typeRef) { - return conn.runAsync(this, runOpts, fetchMode, Types.of(typeRef)); + public CompletableFuture> runUnwrappingAsync(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode, Class typeRef) { + return conn.runAsync(this, runOpts, fetchMode, true, Types.of(typeRef)); } /** * Runs this query via connection {@code conn} with options {@code runOpts}, the specified {@code fetchMode} - * and returns the result asynchronously, with the values converted to the type of {@code TypeReference} + * and returns the result asynchronously, with the values converted to the type of {@code TypeReference}. + * If the query returns an atom which is an array, it'll unwrap the atom as if it were a completed sequence, + * and the type conversion will be applied to each element of the array instead on the array as a whole. * * @param The type of result * @param conn The connection to run this query @@ -365,8 +1668,8 @@ public CompletableFuture> runAsync(Connection conn, OptArgs runOpt * @param typeRef The type to convert to * @return The result of this query */ - public CompletableFuture> runAsync(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode, TypeReference typeRef) { - return conn.runAsync(this, runOpts, fetchMode, typeRef); + public CompletableFuture> runUnwrappingAsync(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode, TypeReference typeRef) { + return conn.runAsync(this, runOpts, fetchMode, true, typeRef); } /** @@ -419,15 +1722,17 @@ private void astToString(StringBuilder builder, String name, String indent, bool builder.append('(').append(length).append(length != 1 ? " bytes" : " byte").append(")"); } builder.append('\n'); - Iterator argsIterator = args.iterator(); - while (argsIterator.hasNext()) { - ReqlAst arg = argsIterator.next(); - arg.astToString(builder, null, - indent + (tail ? " " : "│ "), - !argsIterator.hasNext() && optargs.isEmpty()); + if (args != null) { + Iterator argsIterator = args.iterator(); + while (argsIterator.hasNext()) { + ReqlAst arg = argsIterator.next(); + arg.astToString(builder, null, + indent + (tail ? " " : "│ "), + !argsIterator.hasNext() && (optargs == null || optargs.isEmpty())); + } } - if (!optargs.isEmpty()) { + if (optargs != null && !optargs.isEmpty()) { builder.append(indent).append(tail ? " " : "│ ").append("└── ").append(": \n"); Iterator> optIterator = optargs.entrySet().iterator(); while (optIterator.hasNext()) { @@ -438,4 +1743,22 @@ private void astToString(StringBuilder builder, String name, String indent, bool } } } + + static Map buildToMap(OptArgs opts) { + Map result = new LinkedHashMap<>(opts.size()); + opts.forEach((name, arg) -> result.put(name, arg.build())); + return result; + } + + private static T handleAtom(Result result) { + if (!result.responseType().equals(ResponseType.SUCCESS_ATOM)) { + throw new IllegalStateException("result is not an atom."); + } + + return result.single(); + } + + private static Map> handleGrouping(Result> result) { + return GroupedResult.toMap(result.toList()); + } } \ No newline at end of file diff --git a/src/main/java/com/rethinkdb/gen/model/TopLevel.java b/src/main/java/com/rethinkdb/gen/model/TopLevel.java index eb3ffb25..f12f6f1c 100644 --- a/src/main/java/com/rethinkdb/gen/model/TopLevel.java +++ b/src/main/java/com/rethinkdb/gen/model/TopLevel.java @@ -5,7 +5,6 @@ package com.rethinkdb.gen.model; -import com.rethinkdb.ast.ReqlAst; import com.rethinkdb.model.Arguments; import com.rethinkdb.model.MapObject; import com.rethinkdb.gen.ast.Error; @@ -13,7 +12,6 @@ import com.rethinkdb.gen.exc.ReqlDriverError; import com.rethinkdb.utils.Internals; -import java.util.Arrays; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -29,7 +27,7 @@ public ReqlExpr row(Object... values) { " Use lambda syntax instead"); } - public static Object pathspec(Object... path) { + public Object pathspec(Object... path) { if (path.length < 2) { throw new ReqlDriverError("r.pathspec(...) requires at least two parameters."); } diff --git a/src/main/java/com/rethinkdb/model/GroupedResult.java b/src/main/java/com/rethinkdb/model/GroupedResult.java index c25c43d2..5348f178 100644 --- a/src/main/java/com/rethinkdb/model/GroupedResult.java +++ b/src/main/java/com/rethinkdb/model/GroupedResult.java @@ -3,10 +3,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.stream.Collectors; public class GroupedResult { @@ -30,4 +27,26 @@ public List getValues() { public static Map> toMap(List> list) { return list.stream().collect(Collectors.toMap(GroupedResult::getGroup, it -> new LinkedHashSet<>(it.getValues()))); } + + @Override + public String toString() { + return "GroupedResult{" + + "group=" + group + + ", values=" + values + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + GroupedResult that = (GroupedResult) o; + return Objects.equals(group, that.group) && + Objects.equals(values, that.values); + } + + @Override + public int hashCode() { + return Objects.hash(group, values); + } } diff --git a/src/main/java/com/rethinkdb/model/OptArgs.java b/src/main/java/com/rethinkdb/model/OptArgs.java index d2d4535e..05175080 100644 --- a/src/main/java/com/rethinkdb/model/OptArgs.java +++ b/src/main/java/com/rethinkdb/model/OptArgs.java @@ -23,9 +23,12 @@ public OptArgs with(String key, List value) { } public static OptArgs fromMap(Map map) { - OptArgs oa = new OptArgs(); - oa.putAll(map); - return oa; + if (map == null) { + return new OptArgs(); + } + OptArgs args = new OptArgs(); + args.putAll(map); + return args; } public static OptArgs of(String key, Object val) { diff --git a/src/main/java/com/rethinkdb/net/Connection.java b/src/main/java/com/rethinkdb/net/Connection.java index cebc9550..aa6a70c2 100644 --- a/src/main/java/com/rethinkdb/net/Connection.java +++ b/src/main/java/com/rethinkdb/net/Connection.java @@ -6,27 +6,35 @@ import com.rethinkdb.gen.ast.Db; import com.rethinkdb.gen.exc.ReqlDriverError; import com.rethinkdb.gen.exc.ReqlError; +import com.rethinkdb.gen.model.TopLevel; import com.rethinkdb.gen.proto.ResponseType; -import com.rethinkdb.model.Arguments; import com.rethinkdb.model.OptArgs; import com.rethinkdb.model.Server; +import com.rethinkdb.net.Result.FetchMode; import com.rethinkdb.utils.Internals; import com.rethinkdb.utils.Types; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.net.ssl.SSLContext; import java.io.Closeable; +import java.io.File; +import java.io.FileInputStream; import java.io.InputStream; import java.net.URI; +import java.util.List; import java.util.Objects; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; /** * A single connection to RethinkDB. @@ -34,20 +42,24 @@ * This object is thread-safe. */ public class Connection implements Closeable { - protected final ConnectionSocket.Factory socketFactory; - protected final ResponsePump.Factory pumpFactory; - protected final String hostname; + private static final @NotNull Logger LOGGER = LoggerFactory.getLogger(Connection.class); + + protected final @NotNull String hostname; protected final int port; - protected final @Nullable SSLContext sslContext; - protected final @Nullable Long timeout; protected final @Nullable String user; protected final @Nullable String password; + protected final @Nullable Long timeout; + protected final @Nullable SSLContext sslContext; + //java-only + protected final @NotNull ConnectionSocket.Factory socketFactory; + protected final @NotNull ResponsePump.Factory pumpFactory; + protected final @NotNull FetchMode defaultFetchMode; protected final boolean unwrapLists; - protected final Result.FetchMode defaultFetchMode; + protected final boolean persistentThreads; - protected final AtomicLong nextToken = new AtomicLong(); - protected final Set> tracked = ConcurrentHashMap.newKeySet(); - protected final Lock writeLock = new ReentrantLock(); + protected final @NotNull AtomicLong nextToken = new AtomicLong(); + protected final @NotNull Set> tracked = ConcurrentHashMap.newKeySet(); + protected final @NotNull Lock writeLock = new ReentrantLock(); protected @Nullable String dbname; protected @Nullable ConnectionSocket socket; @@ -58,21 +70,20 @@ public class Connection implements Closeable { * * @param b the connection builder */ - public Connection(Builder b) { - if (b.authKey != null && b.user != null) { - throw new ReqlDriverError("Either `authKey` or `user` can be used, but not both."); - } - this.socketFactory = b.socketFactory != null ? b.socketFactory : DefaultConnectionFactory.INSTANCE; - this.pumpFactory = b.pumpFactory != null ? b.pumpFactory : DefaultConnectionFactory.INSTANCE; - this.hostname = b.hostname != null ? b.hostname : "localhost"; + public Connection(@NotNull Builder b) { + this.hostname = b.hostname != null ? b.hostname : "127.0.0.1"; this.port = b.port != null ? b.port : 28015; + this.user = b.user != null ? b.user : "admin"; + this.password = b.password != null ? b.password : ""; this.dbname = b.dbname; - this.sslContext = b.sslContext; this.timeout = b.timeout; - this.user = b.user != null ? b.user : "admin"; - this.password = b.password != null ? b.password : b.authKey != null ? b.authKey : ""; + this.sslContext = b.sslContext; + //java-only + this.socketFactory = b.socketFactory != null ? b.socketFactory : DefaultConnectionFactory.INSTANCE; + this.pumpFactory = b.pumpFactory != null ? b.pumpFactory : DefaultConnectionFactory.INSTANCE; this.unwrapLists = b.unwrapLists; - this.defaultFetchMode = b.defaultFetchMode != null ? b.defaultFetchMode : Result.FetchMode.LAZY; + this.defaultFetchMode = b.defaultFetchMode != null ? b.defaultFetchMode : FetchMode.LAZY; + this.persistentThreads = b.persistentThreads; } /** @@ -106,20 +117,57 @@ public boolean isOpen() { return socket != null && socket.isOpen() && pump != null && pump.isAlive(); } + /** + * Begins the socket connection to the server asynchronously. + * + * @return a {@link CompletableFuture} which completes with itself, once connected. + */ + public @NotNull CompletableFuture connectAsync() { + if (socket != null) { + throw new ReqlDriverError("Client already connected!"); + } + return createSocketAsync().thenApply(socket -> { + this.socket = socket; + HandshakeProtocol.doHandshake(socket, user, password, timeout); + this.pump = pumpFactory.newPump(socket, !persistentThreads); + return this; + }); + } + /** * Begins the socket connection to the server. * * @return itself, once connected. */ public @NotNull Connection connect() { - if (socket != null) { - throw new ReqlDriverError("Client already connected!"); + try { + return connectAsync().join(); + } catch (CompletionException ce) { + Throwable t = ce.getCause(); + if (t instanceof ReqlError) { + throw ((ReqlError) t); + } + throw new ReqlDriverError(t); } - ConnectionSocket socket = socketFactory.newSocket(hostname, port, sslContext, timeout); - this.socket = socket; - HandshakeProtocol.doHandshake(socket, user, password, timeout); - pump = pumpFactory.newPump(socket); - return this; + } + + /** + * Closes and reconnects to the server. + * + * @return a {@link CompletableFuture} which completes with itself, once reconnected. + */ + public @NotNull CompletableFuture reconnectAsync() { + return reconnectAsync(true); + } + + /** + * Closes and reconnects to the server asynchronously. + * + * @param noreplyWait if closing should send a {@link Connection#noreplyWait()} before closing. + * @return a {@link CompletableFuture} which completes with itself, once reconnected. + */ + public @NotNull CompletableFuture reconnectAsync(boolean noreplyWait) { + return closeAsync(noreplyWait).thenCompose(v -> connectAsync()); } /** @@ -138,9 +186,15 @@ public boolean isOpen() { * @return itself, once reconnected. */ public @NotNull Connection reconnect(boolean noreplyWait) { - close(noreplyWait); - connect(); - return this; + try { + return reconnectAsync(noreplyWait).join(); + } catch (CompletionException ce) { + Throwable t = ce.getCause(); + if (t instanceof ReqlError) { + throw ((ReqlError) t); + } + throw new ReqlDriverError(t); + } } /** @@ -151,19 +205,21 @@ public boolean isOpen() { * @param term The ReQL term * @param optArgs The options to run this query with * @param fetchMode The fetch mode to use in partial sequences + * @param unwrap Override for the connection's unwrapLists setting * @param typeRef The type to convert to * @return The result of this query */ public @NotNull CompletableFuture> runAsync(@NotNull ReqlAst term, @NotNull OptArgs optArgs, - @Nullable Result.FetchMode fetchMode, + @Nullable FetchMode fetchMode, + @Nullable Boolean unwrap, @Nullable TypeReference typeRef) { handleOptArgs(optArgs); Query q = Query.createStart(nextToken.incrementAndGet(), term, optArgs); if (optArgs.containsKey("noreply")) { throw new ReqlDriverError("Don't provide the noreply option as an optarg. Use `.runNoReply` instead of `.run`"); } - return runQuery(q, fetchMode, typeRef); + return runQuery(q, fetchMode, unwrap, typeRef); } /** @@ -174,15 +230,17 @@ public boolean isOpen() { * @param term The ReQL term * @param optArgs The options to run this query with * @param fetchMode The fetch mode to use in partial sequences + * @param unwrap Override for the connection's unwrapLists setting * @param typeRef The type to convert to * @return The result of this query */ public @NotNull Result run(@NotNull ReqlAst term, @NotNull OptArgs optArgs, - @Nullable Result.FetchMode fetchMode, + @Nullable FetchMode fetchMode, + @Nullable Boolean unwrap, @Nullable TypeReference typeRef) { try { - return runAsync(term, optArgs, fetchMode, typeRef).join(); + return runAsync(term, optArgs, fetchMode, unwrap, typeRef).join(); } catch (CompletionException ce) { Throwable t = ce.getCause(); if (t instanceof ReqlError) { @@ -229,7 +287,7 @@ public boolean isOpen() { * @return a {@link CompletableFuture} you can await. */ public @NotNull CompletableFuture noreplyWaitAsync() { - return runQuery(Query.createNoreplyWait(nextToken.incrementAndGet()), null, null).thenApply(ignored -> null); + return runQuery(Query.createNoreplyWait(nextToken.incrementAndGet()), null, null, null).thenApply(ignored -> null); } /** @@ -259,6 +317,25 @@ public void runNoReply(@NotNull ReqlAst term, @NotNull OptArgs optArgs) { runQueryNoreply(Query.createStart(nextToken.incrementAndGet(), term, optArgs)); } + /** + * Closes this connection asynchronously, closing all {@link Result}s, the {@link ResponsePump} and the {@link ConnectionSocket}. + * + * @return a {@link CompletableFuture} which completes when everything is closed. + */ + public @NotNull CompletableFuture closeAsync() { + return closeAsync(true); + } + + /** + * Closes this connection asynchronously, closing all {@link Result}s, the {@link ResponsePump} and the {@link ConnectionSocket}. + * + * @param shouldNoreplyWait If the connection should noreply_wait before closing + * @return a {@link CompletableFuture} which completes when everything is closed. + */ + public @NotNull CompletableFuture closeAsync(boolean shouldNoreplyWait) { + return CompletableFuture.runAsync(() -> this.close(shouldNoreplyWait)); + } + /** * Closes this connection, closing all {@link Result}s, the {@link ResponsePump} and the {@link ConnectionSocket}. */ @@ -308,8 +385,21 @@ public void closeResults() { } } + /** + * Checks if there are any ongoing query. + * @return {@code true} if there's an ongoing query that will be closed if this connection is closed. + */ + public boolean hasOngoingQueries() { + return !tracked.isEmpty(); + } + // protected methods + /** + * Sends a STOP query. Used by {@link Result} partial sequences. + * + * @param token the response token. + */ protected void sendStop(long token) { // While the server does reply to the stop request, we ignore that reply. // This works because the response pump in `connect` ignores replies for which @@ -317,16 +407,33 @@ protected void sendStop(long token) { runQueryNoreply(Query.createStop(token)); } - + /** + * Sends a CONTINUE query. Used by {@link Result} partial sequences. + * + * @param token the response token. + * @return a completable future which completes with the next response. + */ protected @NotNull CompletableFuture sendContinue(long token) { return sendQuery(Query.createContinue(token)); } - protected void loseTrackOf(@NotNull Result r) { + /** + * Callback method from {@link Result}, signals that this connection should keep track of this result. + * Tracked results can be closed using {@link Connection#close()} or {@link Connection#closeResults()}. + * + * @param r the result to be tracked. + */ + protected void keepTrackOf(@NotNull Result r) { tracked.add(r); } - protected void keepTrackOf(@NotNull Result r) { + /** + * Callback method from {@link Result}, signals that this connection should no long keep track of this result. + * The result probably finished or was closed. + * + * @param r the result to be tracked. + */ + protected void loseTrackOf(@NotNull Result r) { tracked.remove(r); } @@ -378,24 +485,55 @@ protected void runQueryNoreply(@NotNull Query query) { } } + /** + * Sents a query to the server and returns a {@link CompletableFuture} which completes with a result. + * Runs a ReQL query with options {@code optArgs}, the specified {@code fetchMode} and returns the result, with the + * values converted to the type of {@code TypeReference} + * + * @param The type of result + * @param query The query + * @param fetchMode The fetch mode to use in partial sequences + * @param unwrap Override for the connection's unwrapLists setting + * @param typeRef The type to convert to + * @return The {@link CompletableFuture} which completes with the result of the query. + */ protected @NotNull CompletableFuture> runQuery(@NotNull Query query, - @Nullable Result.FetchMode fetchMode, + @Nullable FetchMode fetchMode, + @Nullable Boolean unwrap, @Nullable TypeReference typeRef) { return sendQuery(query).thenApply(res -> new Result<>( - this, query, res, fetchMode == null ? defaultFetchMode : fetchMode, typeRef - )); + this, query, res, + fetchMode == null ? defaultFetchMode : fetchMode, + unwrap == null ? unwrapLists : unwrap, + typeRef)); } + /** + * Handle optArgs before sending to the server. + * + * @param optArgs the optArgs. + */ protected void handleOptArgs(@NotNull OptArgs optArgs) { - if (!optArgs.containsKey("db") && dbname != null) { + if (optArgs.containsKey("db")) { + // The db arg must be wrapped in a db ast object + optArgs.with("db", new Db(optArgs.get("db"))); + } else if (dbname != null) { // Only override the db global arg if the user hasn't // specified one already and one is specified on the connection - optArgs.with("db", dbname); + optArgs.with("db", new Db(dbname)); } - if (optArgs.containsKey("db")) { - // The db arg must be wrapped in a db ast object - optArgs.with("db", new Db(Arguments.make(optArgs.get("db")))); + } + + /** + * Detects if the connection socket supports async creation or wraps it before returning. + * + * @return a {@link CompletableFuture} which will complete with a new {@link ConnectionSocket}. + */ + protected @NotNull CompletableFuture createSocketAsync() { + if (socketFactory instanceof ConnectionSocket.AsyncFactory) { + return ((ConnectionSocket.AsyncFactory) socketFactory).newSocketAsync(hostname, port, sslContext, timeout); } + return CompletableFuture.supplyAsync(() -> socketFactory.newSocket(hostname, port, sslContext, timeout)); } // builder @@ -404,22 +542,31 @@ protected void handleOptArgs(@NotNull OptArgs optArgs) { * Builder should be used to build a Connection instance. */ public static class Builder { - private @Nullable ConnectionSocket.Factory socketFactory; - private @Nullable ResponsePump.Factory pumpFactory; private @Nullable String hostname; private @Nullable Integer port; - private @Nullable String dbname; - private @Nullable SSLContext sslContext; - private @Nullable Long timeout; - private @Nullable String authKey; private @Nullable String user; private @Nullable String password; - private @Nullable Result.FetchMode defaultFetchMode; + private @Nullable String dbname; + private @Nullable Long timeout; + private @Nullable SSLContext sslContext; + // java-only + private @Nullable ConnectionSocket.Factory socketFactory; + private @Nullable ResponsePump.Factory pumpFactory; + private @Nullable FetchMode defaultFetchMode; private boolean unwrapLists = false; + private boolean persistentThreads = false; + /** + * Creates an empty builder. + */ public Builder() { } + /** + * Parses a db-ul as a builder. + * + * @param uri the db-url to parse. + */ public Builder(@NotNull URI uri) { Objects.requireNonNull(uri, "URI can't be null. Use the default constructor instead."); if (!"rethinkdb".equals(uri.getScheme())) { @@ -435,7 +582,7 @@ public Builder(@NotNull URI uri) { if (userInfo != null && !userInfo.isEmpty()) { String[] split = userInfo.split(":"); if (split.length > 2) { - throw new IllegalArgumentException("Invalid user info."); + throw new IllegalArgumentException("Invalid user info: '" + userInfo + "'"); } if (split.length > 0) { this.user = split[0]; @@ -450,7 +597,7 @@ public Builder(@NotNull URI uri) { if (port != -1) { this.port = port; } - if (path != null) { + if (path != null && !path.isEmpty()) { if (path.charAt(0) == '/') { path = path.substring(1); } @@ -461,113 +608,433 @@ public Builder(@NotNull URI uri) { if (query != null) { String[] kvs = query.split("&"); for (String kv : kvs) { - String[] split = kv.split("="); - if (split.length != 2) { - throw new IllegalArgumentException("Invalid query."); - } - switch (split[0]) { - case "auth_key": { - String authKey = split[1]; - if (authKey.isEmpty()) { - throw new IllegalArgumentException("Invalid query value."); - } - if (authKey.charAt(0) == '\'' && authKey.charAt(authKey.length() - 1) == '\'') { - authKey = authKey.substring(1, authKey.length() - 1).replace("\\'", "'"); - } - this.authKey = authKey; + int i = kv.indexOf('='); + String k = i != -1 ? kv.substring(0, i) : kv; + String v = i != -1 ? kv.substring(i + 1) : ""; + boolean booleanValue = v.isEmpty() || "true".equals(v) || "enabled".equals(v); + switch (k) { + case "timeout": { + this.timeout = Long.parseLong(v); break; } - case "timeout": { - this.timeout = Long.parseLong(split[1]); + case "java.default_fetch_mode": + case "java.defaultFetchMode": { + this.defaultFetchMode = FetchMode.fromString(v); + break; + } + case "java.unwrap_lists": + case "java.unwrapLists": { + this.unwrapLists = booleanValue; + break; + } + case "java.persistent_threads": + case "java.persistentThreads": { + this.persistentThreads = booleanValue; break; } default: { - throw new IllegalArgumentException("Invalid query parameter."); + LOGGER.debug("Invalid query parameter '{}', skipping", k); } } } } } + /** + * Creates a copy of this builder. + * + * @return a copy of this builder. + * @deprecated Use {@link com.rethinkdb.RethinkDB#connection(Builder) r.connection(Builder)} instead. + * (Will be removed on v2.5.0) + */ + @Deprecated public @NotNull Builder copyOf() { - Builder c = new Builder(); - c.socketFactory = socketFactory; - c.pumpFactory = pumpFactory; - c.hostname = hostname; - c.port = port; - c.dbname = dbname; - c.sslContext = sslContext; - c.timeout = timeout; - c.authKey = authKey; - c.user = user; - c.password = password; - c.unwrapLists = unwrapLists; - c.defaultFetchMode = defaultFetchMode; - return c; - } - - public @NotNull Builder socketFactory(@Nullable ConnectionSocket.Factory factory) { - socketFactory = factory; + return new Builder(this); + } + + /** + * Copies a connection builder. + * + * @param b the original builder. + */ + public Builder(@NotNull Builder b) { + hostname = b.hostname; + port = b.port; + user = b.user; + password = b.password; + dbname = b.dbname; + timeout = b.timeout; + sslContext = b.sslContext; + // java-only + socketFactory = b.socketFactory; + pumpFactory = b.pumpFactory; + unwrapLists = b.unwrapLists; + defaultFetchMode = b.defaultFetchMode; + persistentThreads = b.persistentThreads; + } + + /** + * Sets a custom hostname for the connection. + *

(Configurable by Db-url)

+ * + * @param hostname the hostname, or {@code null}. + * @return itself. + */ + public @NotNull Builder hostname(@Nullable String hostname) { + this.hostname = hostname; return this; } - public @NotNull Builder pumpFactory(@Nullable ResponsePump.Factory factory) { - pumpFactory = factory; + /** + * Sets a custom port for the connection. + *

(Configurable by Db-url)

+ * + * @param port the port, or {@code null}. + * @return itself. + */ + public @NotNull Builder port(@Nullable Integer port) { + this.port = port; return this; } - public @NotNull Builder hostname(@Nullable String val) { - hostname = val; + /** + * Sets a custom username for the connection. + *

(Configurable by Db-url)

+ * + * @param user the username, or {@code null}. + * @return itself. + */ + public @NotNull Builder user(@Nullable String user) { + this.user = user; return this; } - public @NotNull Builder port(@Nullable Integer val) { - port = val; + /** + * Sets a custom username and password for the connection. + *

(Configurable by Db-url)

+ * + * @param user the username, or {@code null}. + * @param password the password, or {@code null}. + * @return itself. + */ + public @NotNull Builder user(@Nullable String user, @Nullable String password) { + this.user = user; + this.password = password; return this; } - public @NotNull Builder db(@Nullable String val) { - dbname = val; + /** + * Sets a custom database for the connection. + *

(Configurable by Db-url)

+ * + * @param dbname the database name, or {@code null}. + * @return itself. + */ + public @NotNull Builder db(@Nullable String dbname) { + this.dbname = dbname; return this; } - public @NotNull Builder authKey(@Nullable String key) { - authKey = key; + /** + * Sets a custom timeout for the connection, in milliseconds. + *

(Db-url key: "timeout")

+ * + * @param timeout the timeout, or {@code null}. + * @return itself. + */ + public @NotNull Builder timeout(@Nullable Long timeout) { + this.timeout = timeout; return this; } - public @NotNull Builder user(@Nullable String user, @Nullable String password) { - this.user = user; - this.password = password; + /** + * Sets a custom authentication key for the connection. + *

(No db-url support)

+ * + * @param authKey the authentication key, or {@code null}. + * @return itself. + * @deprecated Use {@link Builder#user(String, String)} instead. + * (Will be removed on v2.5.0) + */ + @Deprecated + public @NotNull Builder authKey(@Nullable String authKey) { + return user(null, authKey); + } + + /** + * Sets a certificate to provide SSL encryption to the RethinkDB Connection. + *

(No db-url support)

+ * + * @param source a callable which provides a {@link InputStream} with the contents of a certificate file. + * @return itself. + */ + public @NotNull Builder certFile(@NotNull Callable source) { + try (InputStream stream = source.call()) { + return sslContext(Internals.readCertFile(stream)); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Sets a certificate to provide SSL encryption to the RethinkDB Connection. + *

(No db-url support)

+ * + * @param source a {@link InputStream} with the contents of a certificate file. + * @return itself. + */ + public @NotNull Builder certFile(@NotNull InputStream source) { + return certFile(() -> source); + } + + /** + * Sets a certificate to provide SSL encryption to the RethinkDB Connection. + *

(No db-url support)

+ * + * @param file a certificate file to read from. + * @return itself. + */ + public @NotNull Builder certFile(@NotNull File file) { + return certFile(() -> new FileInputStream(file)); + } + + /** + * Sets a {@link SSLContext} to provide SSL encryption to the RethinkDB Connection. + *

(No db-url support)

+ * + * @param sslContext the SSL context, or {@code null}. + * @return itself. + */ + public @NotNull Builder sslContext(@Nullable SSLContext sslContext) { + this.sslContext = sslContext; return this; } - public @NotNull Builder certFile(@NotNull InputStream val) { - return sslContext(Internals.readCertFile(val)); + /** + * Sets a custom {@link ConnectionSocket} factory for the connection. + *

(Java Driver-specific, No db-url support)

+ * + * @param socketFactory the connection socket factory, or {@code null}. + * @return itself. + */ + public @NotNull Builder socketFactory(@Nullable ConnectionSocket.Factory socketFactory) { + this.socketFactory = socketFactory; + return this; } - public @NotNull Builder sslContext(@Nullable SSLContext val) { - sslContext = val; + /** + * Sets a custom {@link ResponsePump} factory for the connection. + *

(Java Driver-specific, No db-url support)

+ * + * @param pumpFactory the response pump factory, or {@code null}. + * @return itself. + */ + public @NotNull Builder pumpFactory(@Nullable ResponsePump.Factory pumpFactory) { + this.pumpFactory = pumpFactory; return this; } - public @NotNull Builder unwrapLists(boolean val) { - unwrapLists = val; + /** + * Sets the default fetch mode for sequences. + * + *
+ * Fetch mode is a Java-driver specific behaviour that allows for fine-tuning on partial sequence fetching. + *

+ * Can be used to balance between high availability and network optimization. The + * {@linkplain FetchMode#AGGRESSIVE aggressive} fetch mode will make best effort to consume the entire sequence, as + * fast as possible, to ensure high availability to the consumer, while the {@linkplain FetchMode#LAZY lazy} + * fetch mode will make no effort and await until all objects were consumed before fetching the next one.
+ * In addiction, there are many preemptive fetch modes, which will consume the next sequence once the buffer + * reaches {@linkplain FetchMode#PREEMPTIVE_HALF half}, a {@linkplain FetchMode#PREEMPTIVE_THIRD third}, + * a {@linkplain FetchMode#PREEMPTIVE_FOURTH fourth}, a {@linkplain FetchMode#PREEMPTIVE_FIFTH fitfh}, + * a {@linkplain FetchMode#PREEMPTIVE_SIXTH sixth}, a {@linkplain FetchMode#PREEMPTIVE_SEVENTH seventh} or + * an {@linkplain FetchMode#PREEMPTIVE_EIGHTH eighth} of it's capacity. + *

+ * + *

(Java Driver-specific, Db-url key: "java.default_fetch_mode")

+ * + * @param defaultFetchMode a default fetch mode, or {@code null}. + * @return itself. + */ + public @NotNull Builder defaultFetchMode(@Nullable FetchMode defaultFetchMode) { + this.defaultFetchMode = defaultFetchMode; return this; } - public @NotNull Builder defaultFetchMode(@Nullable Result.FetchMode val) { - defaultFetchMode = val; + /** + * Sets list unwrapping behaviour for lists. + * + *
+ * List unwrapping is a Java-driver specific behaviour that unwraps an atom response from the server, + * which is a list, as if it were a sequence of objects. + *

+ * Consider the following: + * {@link TopLevel#expr(Object) r.expr}({@link TopLevel#array(Object, Object...) r.array("a", "b", "c")}).{@link com.rethinkdb.gen.ast.ReqlExpr#run(Connection) run(conn)}; + *

+ * By default, it returns a {@link Result} with a single {@link List}["a","b","c"] inside. + *

+ * With list unwrapping, it returns a {@link Result} with "a", "b", "c" inside. + *

+ * The feature makes the code a bit less verbose. For example, iterating goes from: + *

+ * (({@code List}) {@link Result result}{@link Result#single() .single()}){@link List#forEach(Consumer) .forEach(s -> ...)} + *

+ * To: + *

+ * {@link Result result}{@link Result#forEach(Consumer) .forEach(s -> ...)} + *

+ * + *

(Java Driver-specific, Db-url key: "java.unwrap_lists")

+ * + * @param enabled {@code true} to enable list unwrapping, {@code false} to disable. + * @return itself. + */ + public @NotNull Builder unwrapLists(boolean enabled) { + unwrapLists = enabled; return this; } - public @NotNull Builder timeout(@Nullable Long val) { - timeout = val; + /** + * Sets if the response pump should use {@linkplain Thread#setDaemon(boolean) daemon threads} or not.
+ *
+ * Using persistent threads guarantees that the JVM will not exit once the main thread finishes, but will keep + * the JVM alive if the connection is not closed.
+ * Daemon threads will ensure that the JVM can exit automatically, but may or may not ignore ongoing + * asynchronous queries. Your milage may vary. (HTTP or other application frameworks may open persistent + * threads by themselves and may keep the JVM alive until the main window or application shuts down.) + *
+ * + *

(Java Driver-specific, Db-url key: "java.default_fetch_mode")

+ * + * @param enabled {@code true} to use persistent threads in the response pump, {@code false} to use daemon threads. + * @return itself. + * @see Thread#setDaemon(boolean) + */ + public @NotNull Builder persistentThreads(boolean enabled) { + persistentThreads = enabled; return this; } + /** + * Creates a new connection and connects asynchronously to the server. + * + * @return a {@link CompletableFuture} which completes with the connection once connected. + */ + public @NotNull CompletableFuture connectAsync() { + return new Connection(this).connectAsync(); + } + + /** + * Creates a new connection and connects to the server. + * + * @return a newly created connection, connected to the server. + */ public @NotNull Connection connect() { return new Connection(this).connect(); } + + /** + * Creates a {@link URI} with the db-url representing this connection builder. + * + * @return the db-url as a {@link URI}. + */ + public @NotNull URI dbUrl() { + return URI.create(dbUrlString()); + } + + /** + * Creates a db-url representing this connection builder. + * + * @return the db-url. + */ + public @NotNull String dbUrlString() { + StringBuilder b = new StringBuilder("rethinkdb://"); + + if (user != null) { + b.append(user); + if (password != null) { + b.append(':').append(password); + } + b.append('@'); + } + + b.append(hostname != null ? hostname : "127.0.0.1"); + + if (port != null) { + b.append(':').append(port); + } + + if (dbname != null) { + b.append('/').append(dbname); + } + + boolean first = true; + if (timeout != null) { + b.append('?'); + first = false; + + b.append("timeout=").append(timeout); + } + if (defaultFetchMode != null) { + b.append(first ? '?' : "&"); + first = false; + + b.append("java.default_fetch_mode=").append(defaultFetchMode.name().toLowerCase()); + } + if (unwrapLists) { + b.append(first ? '?' : "&"); + first = false; + + b.append("java.unwrap_lists=true"); + } + if (persistentThreads) { + b.append(first ? '?' : "&"); + + first = false; + b.append("java.persistent_threads=true"); + } + + return b.toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Builder builder = (Builder) o; + return Objects.equals(hostname, builder.hostname) && + Objects.equals(port, builder.port) && + Objects.equals(user, builder.user) && + Objects.equals(password, builder.password) && + Objects.equals(dbname, builder.dbname) && + Objects.equals(timeout, builder.timeout) && + Objects.equals(sslContext, builder.sslContext) && + Objects.equals(socketFactory, builder.socketFactory) && + Objects.equals(pumpFactory, builder.pumpFactory) && + Objects.equals(defaultFetchMode, builder.defaultFetchMode) && + unwrapLists == builder.unwrapLists && + persistentThreads == builder.persistentThreads; + } + + /** + * {@inheritDoc} + */ + @Override + public int hashCode() { + return Objects.hash( + hostname, port, user, password, dbname, timeout, sslContext, + socketFactory, pumpFactory, defaultFetchMode, unwrapLists, persistentThreads + ); + } + + /** + * {@inheritDoc} + */ + @Override + public String toString() { + return "Builder{" + dbUrlString() + '}'; + } } } diff --git a/src/main/java/com/rethinkdb/net/ConnectionSocket.java b/src/main/java/com/rethinkdb/net/ConnectionSocket.java index 802d911b..ceb0eaa6 100644 --- a/src/main/java/com/rethinkdb/net/ConnectionSocket.java +++ b/src/main/java/com/rethinkdb/net/ConnectionSocket.java @@ -5,9 +5,8 @@ import javax.net.ssl.SSLContext; import java.io.Closeable; -import java.io.IOError; -import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; /** * A connection socket into the server. @@ -19,20 +18,57 @@ public interface ConnectionSocket extends Closeable { interface Factory { /** * Creates a new connection socket into the server. - * @param hostname the hostname - * @param port the post + * + * @param hostname the hostname + * @param port the post * @param sslContext an {@link SSLContext}, if any - * @param timeoutMs a timeout, in milliseconds, if any + * @param timeoutMs a timeout, in milliseconds, if any * @return a new {@link ConnectionSocket}. */ @NotNull ConnectionSocket newSocket(@NotNull String hostname, - int port, - @Nullable SSLContext sslContext, - @Nullable Long timeoutMs); + int port, + @Nullable SSLContext sslContext, + @Nullable Long timeoutMs); + } + + /** + * An asynchronous factory of sockets. + */ + interface AsyncFactory extends Factory { + /** + * Creates a new connection socket into the server. + * + * @param hostname the hostname + * @param port the post + * @param sslContext an {@link SSLContext}, if any + * @param timeoutMs a timeout, in milliseconds, if any + * @return a new {@link ConnectionSocket}. + */ + default @NotNull ConnectionSocket newSocket(@NotNull String hostname, + int port, + @Nullable SSLContext sslContext, + @Nullable Long timeoutMs) { + return newSocketAsync(hostname, port, sslContext, timeoutMs).join(); + } + + /** + * Creates a new connection socket asynchronously into the server. + * + * @param hostname the hostname + * @param port the post + * @param sslContext an {@link SSLContext}, if any + * @param timeoutMs a timeout, in milliseconds, if any + * @return a {@link CompletableFuture} which will complete with a new {@link ConnectionSocket}. + */ + @NotNull CompletableFuture newSocketAsync(@NotNull String hostname, + int port, + @Nullable SSLContext sslContext, + @Nullable Long timeoutMs); } /** * Checks if the connection socket is open. + * * @return true if the connection socket is open, false otherwise. */ boolean isOpen(); @@ -44,12 +80,14 @@ interface Factory { /** * Writes the contents of the buffer into the socket. + * * @param buffer the contents to write. */ void write(@NotNull ByteBuffer buffer); /** * Reads a defined amount of bytes, and wraps it in a {@link ByteBuffer}. + * * @param length the length of bytes to read. * @return a {@link ByteBuffer} with the read contents. */ diff --git a/src/main/java/com/rethinkdb/net/DefaultConnectionFactory.java b/src/main/java/com/rethinkdb/net/DefaultConnectionFactory.java index 32b3b28a..bab9e3ba 100644 --- a/src/main/java/com/rethinkdb/net/DefaultConnectionFactory.java +++ b/src/main/java/com/rethinkdb/net/DefaultConnectionFactory.java @@ -22,22 +22,23 @@ /** * The default {@link ConnectionSocket.Factory} and {@link ResponsePump.Factory} for any default connections. */ -public class DefaultConnectionFactory implements ConnectionSocket.Factory, ResponsePump.Factory { +public class DefaultConnectionFactory implements ConnectionSocket.AsyncFactory, ResponsePump.Factory { public static final DefaultConnectionFactory INSTANCE = new DefaultConnectionFactory(); private DefaultConnectionFactory() { } @Override - public @NotNull ConnectionSocket newSocket(@NotNull String hostname, int port, SSLContext sslContext, Long timeoutMs) { - SocketWrapper s = new SocketWrapper(hostname, port, sslContext, timeoutMs); - s.connect(); - return s; + public @NotNull CompletableFuture newSocketAsync(@NotNull String hostname, + int port, + @Nullable SSLContext sslContext, + @Nullable Long timeoutMs) { + return CompletableFuture.supplyAsync(() -> new SocketWrapper(hostname, port, sslContext, timeoutMs).connect()); } @Override - public @NotNull ResponsePump newPump(@NotNull ConnectionSocket socket) { - return new ThreadResponsePump(socket); + public @NotNull ResponsePump newPump(@NotNull ConnectionSocket socket, boolean daemonThreads) { + return new ThreadResponsePump(socket, daemonThreads); } private static class SocketWrapper implements ConnectionSocket { @@ -62,7 +63,7 @@ private static class SocketWrapper implements ConnectionSocket { this.timeoutMs = timeoutMs; } - void connect() { + SocketWrapper connect() { try { // establish connection final InetSocketAddress addr = new InetSocketAddress(hostname, port); @@ -92,6 +93,7 @@ void connect() { } catch (IOException e) { throw new ReqlDriverError("Connection timed out.", e); } + return this; } @Override @@ -178,7 +180,7 @@ private static class ThreadResponsePump implements ResponsePump { private final Thread thread; private Map> awaiting = new ConcurrentHashMap<>(); - public ThreadResponsePump(ConnectionSocket socket) { + public ThreadResponsePump(ConnectionSocket socket, boolean daemon) { this.thread = new Thread(() -> { // pump responses until interrupted while (true) { @@ -194,17 +196,24 @@ public ThreadResponsePump(ConnectionSocket socket) { // read response and send it to whoever is waiting, if anyone try { - final Response response = Response.readFromSocket(socket); - final CompletableFuture awaiter = awaiting.remove(response.token); - if (awaiter != null) { - awaiter.complete(response); - } + CompletableFuture.supplyAsync(Response.readFromSocket(socket)).handle((response, t) -> { + if (t != null) { + shutdown(t); + } else { + final CompletableFuture awaiter = awaiting.remove(response.token); + if (awaiter != null) { + awaiter.complete(response); + } + } + return null; + }); } catch (Exception e) { shutdown(e); return; } } }, "RethinkDB-" + socket + "-ResponsePump"); + thread.setDaemon(daemon); thread.start(); } @@ -223,12 +232,12 @@ public boolean isAlive() { return thread.isAlive(); } - private void shutdown(Exception e) { + private void shutdown(Throwable t) { Map> awaiting = this.awaiting; this.awaiting = null; thread.interrupt(); if (awaiting != null) { - awaiting.forEach((token, future) -> future.completeExceptionally(e)); + awaiting.forEach((token, future) -> future.completeExceptionally(t)); } } diff --git a/src/main/java/com/rethinkdb/net/Response.java b/src/main/java/com/rethinkdb/net/Response.java index 2f7f3b0d..8875a020 100644 --- a/src/main/java/com/rethinkdb/net/Response.java +++ b/src/main/java/com/rethinkdb/net/Response.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.Supplier; import java.util.stream.Collectors; public class Response { @@ -83,7 +84,7 @@ public String toString() { } @SuppressWarnings("unchecked") - public static Response readFromSocket(ConnectionSocket socket) { + public static Supplier readFromSocket(ConnectionSocket socket) { final ByteBuffer header = socket.read(12); final long token = header.getLong(); final int responseLength = header.getInt(); @@ -100,20 +101,22 @@ public static Response readFromSocket(ConnectionSocket socket) { ); } - Map json = Internals.readJson(buffer); - return new Response( - token, - ResponseType.fromValue(((Long) json.get("t")).intValue()), - (List) json.getOrDefault("r", Collections.emptyList()), - ((List) json.getOrDefault("n", Collections.emptyList())) - .stream() - .map(Long::intValue) - .map(ResponseNote::maybeFromValue) - .filter(Objects::nonNull) - .collect(Collectors.toList()), - Profile.fromList((List) json.get("p")), - Backtrace.fromList((List) json.getOrDefault("b", null)), - json.containsKey("e") ? ErrorType.maybeFromValue(((Long) json.get("e")).intValue()) : null - ); + return () -> { + Map json = Internals.readJson(buffer); + return new Response( + token, + ResponseType.fromValue(((Long) json.get("t")).intValue()), + (List) json.getOrDefault("r", Collections.emptyList()), + ((List) json.getOrDefault("n", Collections.emptyList())) + .stream() + .map(Long::intValue) + .map(ResponseNote::maybeFromValue) + .filter(Objects::nonNull) + .collect(Collectors.toList()), + Profile.fromList((List) json.get("p")), + Backtrace.fromList((List) json.getOrDefault("b", null)), + json.containsKey("e") ? ErrorType.maybeFromValue(((Long) json.get("e")).intValue()) : null + ); + }; } } diff --git a/src/main/java/com/rethinkdb/net/ResponsePump.java b/src/main/java/com/rethinkdb/net/ResponsePump.java index 497b223e..9546eb67 100644 --- a/src/main/java/com/rethinkdb/net/ResponsePump.java +++ b/src/main/java/com/rethinkdb/net/ResponsePump.java @@ -14,14 +14,31 @@ public interface ResponsePump { interface Factory { /** * Creates a new response pump using the provided connection socket. + * * @param socket the {@link ConnectionSocket} to pump response pumps from * @return a new {@link ResponsePump}. + * @deprecated Implement the {@link Factory#newPump(ConnectionSocket, boolean)} method. + * (Will be removed on v2.5.0) */ - @NotNull ResponsePump newPump(@NotNull ConnectionSocket socket); + @Deprecated + default @NotNull ResponsePump newPump(@NotNull ConnectionSocket socket) { + throw new UnsupportedOperationException(); + } + + /** + * Creates a new response pump using the provided connection socket. + * + * @param socket the {@link ConnectionSocket} to pump response pumps from + * @param daemonThreads suggestion for using daemon threads and not blocking the process to exit. + * @return a new {@link ResponsePump}. + */ + @NotNull + ResponsePump newPump(@NotNull ConnectionSocket socket, boolean daemonThreads); } /** * Creates a response awaiter for a query token. + * * @param token the query token * @return a {@link CompletableFuture} that completes with a {@link Response} that matches the token. */ @@ -30,6 +47,7 @@ interface Factory { /** * Checks if the response pump is alive. + * * @return true if the response pump is alive, false otherwise. */ boolean isAlive(); diff --git a/src/main/java/com/rethinkdb/net/Result.java b/src/main/java/com/rethinkdb/net/Result.java index e5f150e1..3d91667b 100644 --- a/src/main/java/com/rethinkdb/net/Result.java +++ b/src/main/java/com/rethinkdb/net/Result.java @@ -77,7 +77,16 @@ public enum FetchMode { /** * Fetches the next part of the sequence once the buffer becomes empty. */ - LAZY + LAZY; + + @Nullable + public static FetchMode fromString(String s) { + try { + return valueOf(s.toUpperCase()); + } catch (RuntimeException ignored) { + return null; + } + } } protected final Connection connection; @@ -85,6 +94,7 @@ public enum FetchMode { protected final Response firstRes; protected final TypeReference typeRef; protected final Internals.FormatOptions fmt; + protected final boolean unwrapLists; // can be altered depending on the operation protected FetchMode fetchMode; @@ -102,13 +112,15 @@ public Result(Connection connection, Query query, Response firstRes, FetchMode fetchMode, + boolean unwrapLists, TypeReference typeRef) { this.connection = connection; this.query = query; this.firstRes = firstRes; this.fetchMode = fetchMode; this.typeRef = typeRef; - fmt = new Internals.FormatOptions(query.globalOptions); + this.fmt = Internals.parseFormatOptions(query.globalOptions); + this.unwrapLists = unwrapLists; currentResponse.set(firstRes); handleFirstResponse(); } @@ -551,7 +563,7 @@ protected void emitData(Response res) { } lastRequestCount.set(0); for (Object each : (List) Internals.convertPseudotypes(res.data, fmt)) { - if (connection.unwrapLists && firstRes.type.equals(ResponseType.SUCCESS_ATOM) && each instanceof List) { + if (unwrapLists && firstRes.type.equals(ResponseType.SUCCESS_ATOM) && each instanceof List) { for (Object o : ((List) each)) { rawQueue.offer(o == null ? NIL : o); lastRequestCount.incrementAndGet(); diff --git a/src/main/java/com/rethinkdb/utils/Internals.java b/src/main/java/com/rethinkdb/utils/Internals.java index b42a9dc6..2a729cba 100644 --- a/src/main/java/com/rethinkdb/utils/Internals.java +++ b/src/main/java/com/rethinkdb/utils/Internals.java @@ -166,7 +166,7 @@ private static Object handlePseudotypes(Map value, FormatOptions fmt) { } return ((List) value.get("data")).stream() .map(it -> new ArrayList<>((Collection) it)) - .map(it -> new GroupedResult<>(it.remove(0), it)) + .map(it -> new GroupedResult<>(it.get(0), ((List) it.get(1)))) .collect(Collectors.toList()); } case BINARY: { @@ -272,20 +272,34 @@ public static SSLContext readCertFile(@NotNull InputStream certFile) { } } + public static FormatOptions parseFormatOptions(OptArgs args) { + if (args == null) return FormatOptions.DEFAULT; + + Datum time_format = (Datum) args.get("time_format"); + boolean rawTime = time_format != null && "raw".equals(time_format.datum); + + Datum binary_format = (Datum) args.get("binary_format"); + boolean rawBinary = binary_format != null && "raw".equals(binary_format.datum); + + Datum group_format = (Datum) args.get("group_format"); + boolean rawGroups = group_format != null && "raw".equals(group_format.datum); + + if (rawTime || rawBinary || rawGroups) { + return new FormatOptions(rawTime, rawGroups, rawBinary); + } + return FormatOptions.DEFAULT; + } + public static class FormatOptions { + public static final FormatOptions DEFAULT = new FormatOptions(false, false, false); public final boolean rawTime; public final boolean rawGroups; public final boolean rawBinary; - public FormatOptions(OptArgs args) { - Datum time_format = (Datum) args.get("time_format"); - this.rawTime = time_format != null && "raw".equals(time_format.datum); - - Datum binary_format = (Datum) args.get("binary_format"); - this.rawBinary = binary_format != null && "raw".equals(binary_format.datum); - - Datum group_format = (Datum) args.get("group_format"); - this.rawGroups = group_format != null && "raw".equals(group_format.datum); + public FormatOptions(boolean rawTime, boolean rawGroups, boolean rawBinary) { + this.rawTime = rawTime; + this.rawGroups = rawGroups; + this.rawBinary = rawBinary; } } } diff --git a/src/main/java/com/rethinkdb/utils/Types.java b/src/main/java/com/rethinkdb/utils/Types.java index 3ac9079c..41654ce5 100644 --- a/src/main/java/com/rethinkdb/utils/Types.java +++ b/src/main/java/com/rethinkdb/utils/Types.java @@ -1,6 +1,7 @@ package com.rethinkdb.utils; import com.fasterxml.jackson.core.type.TypeReference; +import com.rethinkdb.model.GroupedResult; import java.lang.reflect.MalformedParameterizedTypeException; import java.lang.reflect.ParameterizedType; @@ -67,6 +68,30 @@ public static TypeReference> mapOf(TypeReference keyType, Ty ); } + public static TypeReference> groupOf(Class keyType, Class valueType) { + return new BuiltTypeRef<>( + new BuiltParametrizedType(GroupedResult.class, Objects.requireNonNull(keyType, "keyType"), Objects.requireNonNull(valueType, "valueType")) + ); + } + + public static TypeReference> groupOf(TypeReference keyType, Class valueType) { + return new BuiltTypeRef<>( + new BuiltParametrizedType(GroupedResult.class, Objects.requireNonNull(keyType, "keyType").getType(), Objects.requireNonNull(valueType, "valueType")) + ); + } + + public static TypeReference> groupOf(Class keyType, TypeReference valueType) { + return new BuiltTypeRef<>( + new BuiltParametrizedType(GroupedResult.class, Objects.requireNonNull(keyType, "keyType"), Objects.requireNonNull(valueType, "valueType").getType()) + ); + } + + public static TypeReference> groupOf(TypeReference keyType, TypeReference valueType) { + return new BuiltTypeRef<>( + new BuiltParametrizedType(GroupedResult.class, Objects.requireNonNull(keyType, "keyType").getType(), Objects.requireNonNull(valueType, "valueType").getType()) + ); + } + private static class BuiltTypeRef extends TypeReference { private final Type type; diff --git a/src/test/java/com/rethinkdb/AuthTest.java b/src/test/java/com/rethinkdb/AuthTest.java index f85ede78..62ee158d 100644 --- a/src/test/java/com/rethinkdb/AuthTest.java +++ b/src/test/java/com/rethinkdb/AuthTest.java @@ -35,14 +35,14 @@ public static void oneTimeTearDown() throws Exception { @Test public void testConnectWithNonAdminUser() throws Exception { - Connection bogusConn = TestingFramework.defaultConnectionBuilder().copyOf() + Connection bogusConn = r.connection(TestingFramework.defaultConnectionBuilder()) .user(bogusUsername, bogusPassword).connect(); bogusConn.close(); } @Test (expected=ReqlDriverError.class) public void testConnectWithBothAuthKeyAndUsername() throws Exception { - Connection bogusConn = TestingFramework.defaultConnectionBuilder().copyOf() + Connection bogusConn = r.connection(TestingFramework.defaultConnectionBuilder()) .user(bogusUsername, bogusPassword).authKey("test").connect(); } } diff --git a/src/test/java/com/rethinkdb/DbUrlTest.java b/src/test/java/com/rethinkdb/DbUrlTest.java new file mode 100644 index 00000000..bdb016fc --- /dev/null +++ b/src/test/java/com/rethinkdb/DbUrlTest.java @@ -0,0 +1,124 @@ +package com.rethinkdb; + +import com.rethinkdb.net.Result; +import org.junit.Test; + +import java.net.URI; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +public class DbUrlTest { + public static final RethinkDB r = RethinkDB.r; + private static final String DB_URL_STANDARD = + "rethinkdb://bogus_man:bogus_pass@myhost:1234/mydb?timeout=30"; + private static final String DB_URL_NON_STANDARD = + "rethinkdb://bogus_man:bogus_pass@myhost:1234/mydb?timeout=30&java.default_fetch_mode=lazy&java.unwrap_lists=true&java.persistent_threads=true"; + private static final String DB_URL_NON_STANDARD_ALTERNATE = + "rethinkdb://bogus_man:bogus_pass@myhost:1234/mydb?timeout=30&java.defaultFetchMode=lazy&java.unwrapLists=enabled&java.persistentThreads=enabled"; + + @Test + public void testStandardDbUrl() { + assertEquals(DB_URL_STANDARD, r.connection(DB_URL_STANDARD).dbUrlString()); + assertEquals(URI.create(DB_URL_STANDARD), r.connection(DB_URL_STANDARD).dbUrl()); + assertEquals( + r.connection(DB_URL_STANDARD), + r.connection() + .user("bogus_man", "bogus_pass") + .hostname("myhost") + .port(1234) + .db("mydb") + + .timeout(30L) + ); + assertEquals( + DB_URL_STANDARD, + r.connection() + .user("bogus_man", "bogus_pass") + .hostname("myhost") + .port(1234) + .db("mydb") + + .timeout(30L) + .dbUrlString() + ); + } + + @Test + public void testNonStandardDbUrl() { + assertEquals(DB_URL_NON_STANDARD, r.connection(DB_URL_NON_STANDARD).dbUrlString()); + assertEquals(URI.create(DB_URL_NON_STANDARD), r.connection(DB_URL_NON_STANDARD).dbUrl()); + assertEquals( + r.connection(DB_URL_NON_STANDARD), + r.connection() + .user("bogus_man", "bogus_pass") + .hostname("myhost") + .port(1234) + .db("mydb") + .timeout(30L) + .defaultFetchMode(Result.FetchMode.LAZY) + .unwrapLists(true) + .persistentThreads(true) + ); + assertEquals( + DB_URL_NON_STANDARD, + r.connection() + .user("bogus_man", "bogus_pass") + .hostname("myhost") + .port(1234) + .db("mydb") + .timeout(30L) + .defaultFetchMode(Result.FetchMode.LAZY) + .unwrapLists(true) + .persistentThreads(true) + .dbUrlString() + ); + } + + + @Test + public void testNonStandardAlternateDbUrl() { + assertEquals(DB_URL_NON_STANDARD, r.connection(DB_URL_NON_STANDARD_ALTERNATE).dbUrlString()); + assertEquals(URI.create(DB_URL_NON_STANDARD), r.connection(DB_URL_NON_STANDARD_ALTERNATE).dbUrl()); + assertEquals( + r.connection(DB_URL_NON_STANDARD), + r.connection() + .user("bogus_man", "bogus_pass").hostname("myhost").port(1234).db("mydb") + .timeout(30L).defaultFetchMode(Result.FetchMode.LAZY).unwrapLists(true).persistentThreads(true) + ); + assertEquals( + r.connection(DB_URL_NON_STANDARD_ALTERNATE), + r.connection() + .user("bogus_man", "bogus_pass").hostname("myhost").port(1234).db("mydb") + .timeout(30L).defaultFetchMode(Result.FetchMode.LAZY).unwrapLists(true).persistentThreads(true) + ); + assertEquals( + DB_URL_NON_STANDARD, + r.connection() + .user("bogus_man", "bogus_pass") + .hostname("myhost") + .port(1234) + .db("mydb") + .timeout(30L) + .defaultFetchMode(Result.FetchMode.LAZY) + .unwrapLists(true) + .persistentThreads(true) + .dbUrlString() + ); + assertNotEquals(DB_URL_NON_STANDARD_ALTERNATE, r.connection(DB_URL_NON_STANDARD_ALTERNATE).dbUrlString()); + assertNotEquals(URI.create(DB_URL_NON_STANDARD_ALTERNATE), r.connection(DB_URL_NON_STANDARD_ALTERNATE).dbUrl()); + assertNotEquals( + DB_URL_NON_STANDARD_ALTERNATE, + r.connection() + .user("bogus_man", "bogus_pass") + .hostname("myhost") + .port(1234) + .db("mydb") + .timeout(30L) + .defaultFetchMode(Result.FetchMode.LAZY) + .unwrapLists(true) + .persistentThreads(true) + .dbUrlString() + ); + } +} diff --git a/src/test/java/com/rethinkdb/RethinkDBTest.java b/src/test/java/com/rethinkdb/RethinkDBTest.java index abc52b29..b6b4ddea 100644 --- a/src/test/java/com/rethinkdb/RethinkDBTest.java +++ b/src/test/java/com/rethinkdb/RethinkDBTest.java @@ -401,7 +401,7 @@ public void testConcurrentCursor() throws TimeoutException, InterruptedException } @Test - public void testNoreply() throws Exception { + public void testNoreply() { r.expr(null).runNoReply(conn); } } diff --git a/src/test/java/com/rethinkdb/RunHelperTest.java b/src/test/java/com/rethinkdb/RunHelperTest.java new file mode 100644 index 00000000..ad713a6f --- /dev/null +++ b/src/test/java/com/rethinkdb/RunHelperTest.java @@ -0,0 +1,164 @@ +package com.rethinkdb; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.rethinkdb.gen.exc.ReqlError; +import com.rethinkdb.gen.proto.ResponseType; +import com.rethinkdb.model.MapObject; +import com.rethinkdb.net.Connection; +import com.rethinkdb.net.Result; +import com.rethinkdb.utils.Types; +import org.junit.*; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +public class RunHelperTest { + private static final TypeReference> stringList = Types.listOf(String.class); + private static final TypeReference> stringObjectMap = Types.mapOf(String.class, Object.class); + + public static final RethinkDB r = RethinkDB.r; + Connection conn; + public static final String dbName = "javatests"; + public static final String tableName = "atest"; + + @BeforeClass + public static void oneTimeSetUp() throws Exception { + Connection conn = TestingFramework.createConnection(); + try { + r.dbCreate(dbName).run(conn); + } catch (ReqlError e) { + } + try { + r.db(dbName).wait_().run(conn); + r.db(dbName).tableCreate(tableName).run(conn); + r.db(dbName).table(tableName).wait_().run(conn); + } catch (ReqlError e) { + } + conn.close(); + } + + @AfterClass + public static void oneTimeTearDown() throws Exception { + Connection conn = TestingFramework.createConnection(); + try { + r.db(dbName).tableDrop(tableName).run(conn); + r.dbDrop(dbName).run(conn); + } catch (ReqlError e) { + } + conn.close(); + } + + @Before + public void setUp() throws Exception { + conn = TestingFramework.createConnection(); + r.db(dbName).table(tableName).delete().run(conn); + } + + @After + public void tearDown() throws Exception { + conn.close(); + } + + + @Test + public void testRun() { + MapObject expected = r.hashMap("foo", r.hashMap("num", 1L)); + Result result = r.expr(expected).run(conn); + assertEquals(ResponseType.SUCCESS_ATOM, result.responseType()); + assertEquals(expected, result.single()); + assertFalse(conn.hasOngoingQueries()); + } + + @Test + public void testRunAsync() { + MapObject expected = r.hashMap("foo", r.hashMap("num", 1L)); + CompletableFuture> result = r.expr(expected).runAsync(conn); + assertFalse(conn.hasOngoingQueries()); + assertEquals(ResponseType.SUCCESS_ATOM, result.thenApply(Result::responseType).join()); + assertEquals(expected, result.thenApply(Result::single).join()); + } + + @Test + public void testRunAtom() { + MapObject expected = r.hashMap("foo", r.hashMap("num", 1L)); + assertEquals(expected, r.expr(expected).runAtom(conn)); + assertFalse(conn.hasOngoingQueries()); + } + + @Test + public void testRunAtomAsync() { + MapObject expected = r.hashMap("foo", r.hashMap("num", 1L)); + CompletableFuture result = r.expr(expected).runAtomAsync(conn); + assertFalse(conn.hasOngoingQueries()); + assertEquals(expected, result.join()); + } + + @Test + public void testRunGrouping() { + MapObject obj1 = r.hashMap("name", "foo").with("value", 1L); + MapObject obj2 = r.hashMap("name", "bar").with("value", 3L); + MapObject obj3 = r.hashMap("name", "foo").with("value", 6L); + MapObject obj4 = r.hashMap("name", "bar").with("value", 11L); + + MapObject> expected = new MapObject>() + .with("foo", new HashSet<>(r.array(obj1, obj3))) + .with("bar", new HashSet<>(r.array(obj2, obj4))); + + Map> result = r.expr(r.array(obj1, obj2, obj3, obj4)) + .group("name") + .runGrouping(conn, Object.class, Object.class); + + assertEquals(expected, result); + assertFalse(conn.hasOngoingQueries()); + } + + @Test + public void testRunGroupingAsync() { + MapObject obj1 = r.hashMap("name", "foo").with("value", 1L); + MapObject obj2 = r.hashMap("name", "bar").with("value", 3L); + MapObject obj3 = r.hashMap("name", "foo").with("value", 6L); + MapObject obj4 = r.hashMap("name", "bar").with("value", 11L); + + MapObject> expected = new MapObject>() + .with("foo", new HashSet<>(r.array(obj1, obj3))) + .with("bar", new HashSet<>(r.array(obj2, obj4))); + + CompletableFuture>> result = r.expr(r.array(obj1, obj2, obj3, obj4)) + .group("name") + .runGroupingAsync(conn, Object.class, Object.class); + + assertFalse(conn.hasOngoingQueries()); + assertEquals(expected, result.join()); + } + + @Test + public void testRunUnwrapping() { + MapObject expected1 = r.hashMap("foo", r.hashMap("num", 1L)); + MapObject expected2 = r.hashMap("bar", r.hashMap("num", 2L)); + MapObject expected3 = r.hashMap("zzz", r.hashMap("num", 4L)); + Result result = r.expr(r.array(expected1, expected2, expected3)).runUnwrapping(conn); + assertEquals(ResponseType.SUCCESS_ATOM, result.responseType()); + assertEquals(expected1, result.next()); + assertEquals(expected2, result.next()); + assertEquals(expected3, result.next()); + assertFalse(conn.hasOngoingQueries()); + } + + @Test + public void testRunUnwrappingAsync() { + MapObject expected1 = r.hashMap("foo", r.hashMap("num", 1L)); + MapObject expected2 = r.hashMap("bar", r.hashMap("num", 2L)); + MapObject expected3 = r.hashMap("zzz", r.hashMap("num", 4L)); + CompletableFuture> result = r.expr(r.array(expected1, expected2, expected3)).runUnwrappingAsync(conn); + assertFalse(conn.hasOngoingQueries()); + assertEquals(expected1, result.thenApply(Result::next).join()); + assertEquals(expected2, result.thenApply(Result::next).join()); + assertEquals(expected3, result.thenApply(Result::next).join()); + } +} diff --git a/src/test/java/com/rethinkdb/TestingFramework.java b/src/test/java/com/rethinkdb/TestingFramework.java index df05d8e7..c72cacce 100644 --- a/src/test/java/com/rethinkdb/TestingFramework.java +++ b/src/test/java/com/rethinkdb/TestingFramework.java @@ -1,78 +1,45 @@ package com.rethinkdb; import com.rethinkdb.net.Connection; +import com.rethinkdb.net.Result; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.Objects; -import java.util.Properties; +import java.io.*; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; + +import static com.rethinkdb.RethinkDB.*; +import static java.nio.charset.StandardCharsets.*; /** * Very basic testing framework lying miserably in the java's default package. */ public class TestingFramework { + private static final String OVERRIDE_FILE_NAME = "test-dburl-override.txt"; - private static final String DEFAULT_CONFIG_RESOURCE = "default-config.properties"; - private static final String OVERRIDE_FILE_NAME = "test-config-override.properties"; - - // properties used to populate configuration - private static final String PROP_HOSTNAME = "hostName"; - private static final String PROP_PORT = "port"; - private static final String PROP_AUTHKEY = "authKey"; - - private static Connection.Builder defaultConnectionBuilder; + private static Connection.Builder builder; /** * Provision a connection builder based on the test configuration. *

- * Put a propertiy file called "test-config-override.properties" in the working - * directory of the tests to override default values. - *

- * Example: - *

-     *     hostName=myHost
-     *     port=12345
-     * 
- *

+ * Put a propertiy file called "test-dburl-override.txt" in the working directory of the tests to override default values. + * The file contents must be a RethinkDB db-url. * * @return Default connection builder. */ public static Connection.Builder defaultConnectionBuilder() { - if (defaultConnectionBuilder == null) { - Properties config = new Properties(); - - try (InputStream is = TestingFramework.class.getClassLoader().getResourceAsStream(DEFAULT_CONFIG_RESOURCE)) { - config.load(Objects.requireNonNull(is)); - } catch (NullPointerException | IOException e) { - throw new IllegalStateException(e); - } - - // Check the local override file. - String workdir = System.getProperty("user.dir"); - File defaultFile = new File(workdir, OVERRIDE_FILE_NAME); + if (builder == null) { + File defaultFile = new File(OVERRIDE_FILE_NAME); if (defaultFile.exists()) { - try (InputStream is = new FileInputStream(defaultFile)) { - config.load(is); + try { + builder = r.connection(new String(Files.readAllBytes(defaultFile.toPath()), UTF_8)); } catch (IOException e) { throw new IllegalStateException(e); } - } - - // provision connection builder based on configuration - defaultConnectionBuilder = RethinkDB.r.connection(); - // mandatory fields - defaultConnectionBuilder = defaultConnectionBuilder.hostname(config.getProperty(PROP_HOSTNAME).trim()); - defaultConnectionBuilder = defaultConnectionBuilder.port(Integer.parseInt(config.getProperty(PROP_PORT).trim())); - // optional fields - final String authKey = config.getProperty(PROP_AUTHKEY); - if (authKey != null) { - defaultConnectionBuilder.authKey(config.getProperty(PROP_AUTHKEY).trim()); + } else { + builder = r.connection(); } } - - return defaultConnectionBuilder; + return builder; } /** diff --git a/src/test/java/com/rethinkdb/TypesTest.java b/src/test/java/com/rethinkdb/TypesTest.java new file mode 100644 index 00000000..cf71561e --- /dev/null +++ b/src/test/java/com/rethinkdb/TypesTest.java @@ -0,0 +1,105 @@ +package com.rethinkdb; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.rethinkdb.utils.Types; +import org.junit.Test; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.*; + +public class TypesTest { + private interface T {} + private interface K {} + private interface V {} + + private static final TypeReference TYPE_REF_T = new TypeReference() {}; + private static final TypeReference TYPE_REF_K = new TypeReference() {}; + private static final TypeReference TYPE_REF_V = new TypeReference() {}; + private static final TypeReference> TYPE_REF_LIST_V = new TypeReference>() {}; + private static final TypeReference> TYPE_REF_LIST_T = new TypeReference>() {}; + private static final TypeReference> TYPE_REF_SET_T = new TypeReference>() {}; + private static final TypeReference> TYPE_REF_MAP_K_V = new TypeReference>() {}; + private static final TypeReference>> TYPE_REF_MAP_K_LIST_V = new TypeReference>>() {}; + + private static void assertType(TypeReference expected, TypeReference actual) { + assertEquals(expected.getType(), actual.getType()); + } + + @Test + public void testOf() { + assertType(TYPE_REF_T, Types.of(T.class)); + } + + @Test(expected = NullPointerException.class) + public void testNullClassOf() { + Types.of(null); + } + + @Test + public void testListOf() { + assertType(TYPE_REF_LIST_T, Types.listOf(T.class)); + assertType(TYPE_REF_LIST_T, Types.listOf(TYPE_REF_T)); + } + + @Test(expected = NullPointerException.class) + public void testNullClassListOf() { + Types.listOf((Class) null); + } + + @Test(expected = NullPointerException.class) + public void testNullTypeListOf() { + Types.listOf((TypeReference) null); + } + + @Test + public void testSetOf() { + assertType(TYPE_REF_SET_T, Types.setOf(T.class)); + assertType(TYPE_REF_SET_T, Types.setOf(TYPE_REF_T)); + } + + @Test(expected = NullPointerException.class) + public void testNullClassSetOf() { + Types.setOf((Class) null); + } + + @Test(expected = NullPointerException.class) + public void testNullTypeSetOf() { + Types.setOf((TypeReference) null); + } + + @Test + public void testMapOf() { + assertType(TYPE_REF_MAP_K_V, Types.mapOf(K.class, V.class)); + assertType(TYPE_REF_MAP_K_V, Types.mapOf(TYPE_REF_K, V.class)); + assertType(TYPE_REF_MAP_K_V, Types.mapOf(K.class, TYPE_REF_V)); + assertType(TYPE_REF_MAP_K_V, Types.mapOf(TYPE_REF_K, TYPE_REF_V)); + + assertType(TYPE_REF_MAP_K_LIST_V, Types.mapOf(K.class, Types.listOf(V.class))); + assertType(TYPE_REF_MAP_K_LIST_V, Types.mapOf(TYPE_REF_K, Types.listOf(TYPE_REF_V))); + assertType(TYPE_REF_MAP_K_LIST_V, Types.mapOf(K.class, TYPE_REF_LIST_V)); + assertType(TYPE_REF_MAP_K_LIST_V, Types.mapOf(TYPE_REF_K, TYPE_REF_LIST_V)); + } + + @Test(expected = NullPointerException.class) + public void testNullClassKeyMapOf() { + Types.mapOf((Class) null, V.class); + } + + @Test(expected = NullPointerException.class) + public void testNullTypeKeyMapOf() { + Types.mapOf((TypeReference) null, V.class); + } + + @Test(expected = NullPointerException.class) + public void testNullClassValueMapOf() { + Types.mapOf(K.class, (Class) null); + } + + @Test(expected = NullPointerException.class) + public void testNullTypeValueMapOf() { + Types.mapOf(K.class, (TypeReference) null); + } +} \ No newline at end of file diff --git a/templates/TopLevel.java b/templates/TopLevel.java index d5656724..3220aac3 100644 --- a/templates/TopLevel.java +++ b/templates/TopLevel.java @@ -1,7 +1,6 @@ <%page args="all_terms" /> package com.rethinkdb.gen.model; -import com.rethinkdb.ast.ReqlAst; import com.rethinkdb.model.Arguments; import com.rethinkdb.model.MapObject; import com.rethinkdb.gen.ast.Error; @@ -9,7 +8,6 @@ import com.rethinkdb.gen.exc.ReqlDriverError; import com.rethinkdb.utils.Internals; -import java.util.Arrays; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -25,7 +23,7 @@ public ReqlExpr row(Object... values) { " Use lambda syntax instead"); } - public static Object pathspec(Object... path) { + public Object pathspec(Object... path) { if (path.length < 2) { throw new ReqlDriverError("r.pathspec(...) requires at least two parameters."); }