diff --git a/.gitignore b/.gitignore index dafefef7..68c356a5 100644 --- a/.gitignore +++ b/.gitignore @@ -27,6 +27,7 @@ virtualenv/ # RethinkDB scripts/*.proto +test-dburl-override.txt # Editors .vscode/ diff --git a/README.md b/README.md index f06e857f..295ee69d 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,7 @@ [![Maven Central](https://img.shields.io/maven-central/v/com.rethinkdb/rethinkdb-driver)](https://search.maven.org/artifact/com.rethinkdb/rethinkdb-driver) [![Bintray](https://img.shields.io/bintray/v/rethinkdb/maven/rethinkdb-driver)](https://bintray.com/rethinkdb/maven/rethinkdb-driver/_latestVersion) +[![License](https://img.shields.io/github/license/rethinkdb/rethinkdb-java?color=lightgrey)](https://github.com/rethinkdb/rethinkdb-java/tree/master/LICENSE) [![Travis-CI.org](https://img.shields.io/travis/rethinkdb/rethinkdb-java)](https://travis-ci.org/rethinkdb/rethinkdb-java) [![Twitter](https://img.shields.io/twitter/url?style=social&url=https%3A%2F%2Fgithub.com%2Frethinkdb%2Frethinkdb-java)](https://twitter.com/intent/tweet?text=Wow:&url=https%3A%2F%2Fgithub.com%2Frethinkdb%2Frethinkdb-java) @@ -23,14 +24,18 @@ Run `./gradlew assemble` to build the jar or `./gradlew install` to install it i ## Contributing to the driver +If you want to contribute to the driver, make sure to base your branch off of our **develop** branch (or a feature-branch) +and create your PR into that **same** branch. **We will be rejecting any PRs between branches or into release branches!** +It is very possible that your change might already be in development or you missed something. + ### Installation Besides JDK 8, to be able to contribute to the driver, you must also install: * Python **3.6** or **3.7** * PIP3 libraries: - * mako - * rethinkdb + * `mako` + * `rethinkdb` ### Using Gradle @@ -105,6 +110,4 @@ These are also checked into git, so you don't need to run the conversion script This section was moved to separate documentation: -> [How to deploy this repository to Bintray](DEPLOYING-BINTRAY.md) - -> [How to deploy this repository to Maven Central (Sonatype)](DEPLOYING-SONATYPE.md) \ No newline at end of file +> [How to deploy this repository to Bintray with integration with Maven Central (Sonatype)](DEPLOYING.md) diff --git a/build.gradle.kts b/build.gradle.kts index cc54ac80..c3d2c1b4 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -13,7 +13,7 @@ plugins { id("com.jfrog.bintray") version "1.8.4" } -version = "2.4.1.1" +version = "2.4.2" group = "com.rethinkdb" java.sourceCompatibility = JavaVersion.VERSION_1_8 diff --git a/src/main/java/com/rethinkdb/RethinkDB.java b/src/main/java/com/rethinkdb/RethinkDB.java index d6216dbe..79d65668 100644 --- a/src/main/java/com/rethinkdb/RethinkDB.java +++ b/src/main/java/com/rethinkdb/RethinkDB.java @@ -14,7 +14,7 @@ */ public class RethinkDB extends TopLevel { /** - * The Singleton to use to begin interacting with RethinkDB Driver + * The Singleton to use to begin interacting with RethinkDB Driver. */ public static final RethinkDB r = new RethinkDB(); /** @@ -25,7 +25,7 @@ public class RethinkDB extends TopLevel { /** * Gets (or creates, if null) the {@link ObjectMapper} for handling {@link com.rethinkdb.net.Result}'s values. * - * @return the {@link com.rethinkdb.net.Result}'s {@link ObjectMapper} + * @return the {@link com.rethinkdb.net.Result}'s {@link ObjectMapper}. */ public synchronized static @NotNull ObjectMapper getResultMapper() { ObjectMapper mapper = resultMapper; @@ -40,7 +40,7 @@ public class RethinkDB extends TopLevel { /** * Sets the {@link ObjectMapper} for handling {@link com.rethinkdb.net.Result}'s values. * - * @param mapper an {@link ObjectMapper}, or null + * @param mapper an {@link ObjectMapper}, or null. */ public synchronized static void setResultMapper(@Nullable ObjectMapper mapper) { resultMapper = mapper; @@ -49,7 +49,7 @@ public synchronized static void setResultMapper(@Nullable ObjectMapper mapper) { /** * Creates a new connection builder. * - * @return a newly created {@link Connection.Builder} + * @return a newly created {@link Connection.Builder}. */ public @NotNull Connection.Builder connection() { return new Connection.Builder(); @@ -59,7 +59,7 @@ public synchronized static void setResultMapper(@Nullable ObjectMapper mapper) { * Creates a new connection builder and configures it with a db-url. * * @param dburl the db-url to configure the builder. - * @return a newly created {@link Connection.Builder} + * @return a newly created {@link Connection.Builder}. */ public @NotNull Connection.Builder connection(@NotNull String dburl) { return connection(URI.create(dburl)); @@ -69,9 +69,20 @@ public synchronized static void setResultMapper(@Nullable ObjectMapper mapper) { * Creates a new connection builder and configures it with a db-url. * * @param uri the db-url to configure the builder. - * @return a newly created {@link Connection.Builder} + * @return a newly created {@link Connection.Builder}. */ public @NotNull Connection.Builder connection(@NotNull URI uri) { return new Connection.Builder(uri); } + + /** + * Copies a connection builder. + * + * @param b the original builder. + * @return a copy of the {@link Connection.Builder}. + */ + public @NotNull Connection.Builder connection(Connection.Builder b) { + return new Connection.Builder(b); + } + } diff --git a/src/main/java/com/rethinkdb/ast/Query.java b/src/main/java/com/rethinkdb/ast/Query.java index 0056e2cd..d3d5a9ed 100644 --- a/src/main/java/com/rethinkdb/ast/Query.java +++ b/src/main/java/com/rethinkdb/ast/Query.java @@ -1,10 +1,10 @@ package com.rethinkdb.ast; -import com.rethinkdb.RethinkDB; import com.rethinkdb.gen.exc.ReqlRuntimeError; import com.rethinkdb.gen.proto.QueryType; import com.rethinkdb.model.OptArgs; import com.rethinkdb.utils.Internals; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,21 +22,20 @@ public class Query { private static final Logger LOGGER = LoggerFactory.getLogger(Query.class); - public final QueryType type; + public final @NotNull QueryType type; public final long token; - public final OptArgs globalOptions; - public final @Nullable ReqlAst term; + public final @Nullable OptArgs globalOptions; - public Query(QueryType type, long token, @Nullable ReqlAst term, OptArgs globalOptions) { + public Query(@NotNull QueryType type, long token, @Nullable ReqlAst term, @Nullable OptArgs globalOptions) { this.type = type; this.token = token; this.term = term; this.globalOptions = globalOptions; } - public Query(QueryType type, long token) { - this(type, token, null, new OptArgs()); + public Query(@NotNull QueryType type, long token) { + this(type, token, null, null); } public ByteBuffer serialize() { @@ -47,8 +46,8 @@ public ByteBuffer serialize() { if (term != null) { list.add(term.build()); } - if (!globalOptions.isEmpty()) { - list.add(ReqlAst.buildOptarg(globalOptions)); + if (globalOptions != null && !globalOptions.isEmpty()) { + list.add(ReqlAst.buildToMap(globalOptions)); } String json = Internals.getInternalMapper().writeValueAsString(list); byte[] bytes = json.getBytes(StandardCharsets.UTF_8); diff --git a/src/main/java/com/rethinkdb/ast/ReqlAst.java b/src/main/java/com/rethinkdb/ast/ReqlAst.java index ffa0ced7..d7d58411 100644 --- a/src/main/java/com/rethinkdb/ast/ReqlAst.java +++ b/src/main/java/com/rethinkdb/ast/ReqlAst.java @@ -12,6 +12,8 @@ import com.rethinkdb.net.Connection; import com.rethinkdb.net.Result; import com.rethinkdb.utils.Types; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import java.util.*; import java.util.Map.Entry; @@ -22,33 +24,27 @@ * Base class for all ReQL queries. */ public class ReqlAst { - protected final TermType termType; - protected final Arguments args; - protected final OptArgs optargs; - - protected ReqlAst(TermType termType, Arguments args, OptArgs optargs) { - if (termType == null) { - throw new ReqlDriverError("termType can't be null!"); - } + protected final @NotNull TermType termType; + protected final @Nullable Arguments args; + protected final @Nullable OptArgs optargs; + + protected ReqlAst(@NotNull TermType termType, @Nullable Arguments args, @Nullable OptArgs optargs) { + //if (termType == null) { + // throw new ReqlDriverError("termType can't be null!"); + //} this.termType = termType; - this.args = args != null ? args : new Arguments(); - this.optargs = optargs != null ? optargs : new OptArgs(); - } - - public static Map buildOptarg(OptArgs opts) { - Map result = new LinkedHashMap<>(opts.size()); - opts.forEach((name, arg) -> result.put(name, arg.build())); - return result; + this.args = args; + this.optargs = optargs; } protected Object build() { - // Create a JSON object from the Ast + // Create a JSON object from the AST // set initial capacity to max size possible, avoids resizing List list = new ArrayList<>(3); list.add(termType.value); - list.add(args.isEmpty() ? Collections.emptyList() : args.stream().map(ReqlAst::build).collect(Collectors.toList())); - if (optargs.size() > 0) { - list.add(buildOptarg(optargs)); + list.add(args == null || args.isEmpty() ? Collections.emptyList() : args.stream().map(ReqlAst::build).collect(Collectors.toList())); + if (optargs != null && !optargs.isEmpty()) { + list.add(buildToMap(optargs)); } return list; } @@ -377,7 +373,7 @@ public CompletableFuture> runAsync(Connection conn, OptArgs runOpt * @return The result of this query */ public Object runAtom(Connection conn) { - return handleAtom(conn.run(this, new OptArgs(), null, null, null)); + return handleAtom(conn.run(this, new OptArgs(), null, false, null)); } /** @@ -388,7 +384,7 @@ public Object runAtom(Connection conn) { * @return The result of this query */ public Object runAtom(Connection conn, OptArgs runOpts) { - return handleAtom(conn.run(this, runOpts, null, null, null)); + return handleAtom(conn.run(this, runOpts, null, false, null)); } /** @@ -400,7 +396,7 @@ public Object runAtom(Connection conn, OptArgs runOpts) { * @return The result of this query */ public Object runAtom(Connection conn, Result.FetchMode fetchMode) { - return handleAtom(conn.run(this, new OptArgs(), fetchMode, null, null)); + return handleAtom(conn.run(this, new OptArgs(), fetchMode, false, null)); } /** @@ -413,7 +409,7 @@ public Object runAtom(Connection conn, Result.FetchMode fetchMode) { * @return The result of this query */ public T runAtom(Connection conn, Class typeRef) { - return handleAtom(conn.run(this, new OptArgs(), null, null, Types.of(typeRef))); + return handleAtom(conn.run(this, new OptArgs(), null, false, Types.of(typeRef))); } /** @@ -426,7 +422,7 @@ public T runAtom(Connection conn, Class typeRef) { * @return The result of this query (either a {@code P or a Cursor

} */ public T runAtom(Connection conn, TypeReference typeRef) { - return handleAtom(conn.run(this, new OptArgs(), null, null, typeRef)); + return handleAtom(conn.run(this, new OptArgs(), null, false, typeRef)); } /** @@ -439,7 +435,7 @@ public T runAtom(Connection conn, TypeReference typeRef) { * @return The result of this query */ public Object runAtom(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode) { - return handleAtom(conn.run(this, runOpts, fetchMode, null, null)); + return handleAtom(conn.run(this, runOpts, fetchMode, false, null)); } /** @@ -453,7 +449,7 @@ public Object runAtom(Connection conn, OptArgs runOpts, Result.FetchMode fetchMo * @return The result of this query */ public T runAtom(Connection conn, OptArgs runOpts, Class typeRef) { - return handleAtom(conn.run(this, runOpts, null, null, Types.of(typeRef))); + return handleAtom(conn.run(this, runOpts, null, false, Types.of(typeRef))); } /** @@ -467,7 +463,7 @@ public T runAtom(Connection conn, OptArgs runOpts, Class typeRef) { * @return The result of this query */ public T runAtom(Connection conn, OptArgs runOpts, TypeReference typeRef) { - return handleAtom(conn.run(this, runOpts, null, null, typeRef)); + return handleAtom(conn.run(this, runOpts, null, false, typeRef)); } /** @@ -481,7 +477,7 @@ public T runAtom(Connection conn, OptArgs runOpts, TypeReference typeRef) * @return The result of this query */ public T runAtom(Connection conn, Result.FetchMode fetchMode, Class typeRef) { - return handleAtom(conn.run(this, new OptArgs(), fetchMode, null, Types.of(typeRef))); + return handleAtom(conn.run(this, new OptArgs(), fetchMode, false, Types.of(typeRef))); } /** @@ -495,7 +491,7 @@ public T runAtom(Connection conn, Result.FetchMode fetchMode, Class typeR * @return The result of this query */ public T runAtom(Connection conn, Result.FetchMode fetchMode, TypeReference typeRef) { - return handleAtom(conn.run(this, new OptArgs(), fetchMode, null, typeRef)); + return handleAtom(conn.run(this, new OptArgs(), fetchMode, false, typeRef)); } /** @@ -510,7 +506,7 @@ public T runAtom(Connection conn, Result.FetchMode fetchMode, TypeReference< * @return The result of this query */ public T runAtom(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode, Class typeRef) { - return handleAtom(conn.run(this, runOpts, fetchMode, null, Types.of(typeRef))); + return handleAtom(conn.run(this, runOpts, fetchMode, false, Types.of(typeRef))); } /** @@ -525,7 +521,7 @@ public T runAtom(Connection conn, OptArgs runOpts, Result.FetchMode fetchMod * @return The result of this query */ public T runAtom(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode, TypeReference typeRef) { - return handleAtom(conn.run(this, runOpts, fetchMode, null, typeRef)); + return handleAtom(conn.run(this, runOpts, fetchMode, false, typeRef)); } /** @@ -535,7 +531,7 @@ public T runAtom(Connection conn, OptArgs runOpts, Result.FetchMode fetchMod * @return The result of this query */ public CompletableFuture runAtomAsync(Connection conn) { - return conn.runAsync(this, new OptArgs(), null, null, null).thenApplyAsync(ReqlAst::handleAtom); + return conn.runAsync(this, new OptArgs(), null, false, null).thenApply(ReqlAst::handleAtom); } /** @@ -547,7 +543,7 @@ public CompletableFuture runAtomAsync(Connection conn) { * @return The result of this query */ public CompletableFuture runAtomAsync(Connection conn, OptArgs runOpts) { - return conn.runAsync(this, runOpts, null, null, null).thenApplyAsync(ReqlAst::handleAtom); + return conn.runAsync(this, runOpts, null, false, null).thenApply(ReqlAst::handleAtom); } /** @@ -559,7 +555,7 @@ public CompletableFuture runAtomAsync(Connection conn, OptArgs runOpts) * @return The result of this query */ public CompletableFuture runAtomAsync(Connection conn, Result.FetchMode fetchMode) { - return conn.runAsync(this, new OptArgs(), fetchMode, null, null).thenApplyAsync(ReqlAst::handleAtom); + return conn.runAsync(this, new OptArgs(), fetchMode, false, null).thenApply(ReqlAst::handleAtom); } @@ -573,7 +569,7 @@ public CompletableFuture runAtomAsync(Connection conn, Result.FetchMode * @return The result of this query */ public CompletableFuture runAtomAsync(Connection conn, Class typeRef) { - return conn.runAsync(this, new OptArgs(), null, null, Types.of(typeRef)).thenApplyAsync(ReqlAst::handleAtom); + return conn.runAsync(this, new OptArgs(), null, false, Types.of(typeRef)).thenApply(ReqlAst::handleAtom); } /** @@ -586,7 +582,7 @@ public CompletableFuture runAtomAsync(Connection conn, Class typeRef) * @return The result of this query (either a {@code P or a Cursor

} */ public CompletableFuture runAtomAsync(Connection conn, TypeReference typeRef) { - return conn.runAsync(this, new OptArgs(), null, null, typeRef).thenApplyAsync(ReqlAst::handleAtom); + return conn.runAsync(this, new OptArgs(), null, false, typeRef).thenApply(ReqlAst::handleAtom); } /** @@ -599,7 +595,7 @@ public CompletableFuture runAtomAsync(Connection conn, TypeReference t * @return The result of this query */ public CompletableFuture runAtomAsync(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode) { - return conn.runAsync(this, runOpts, fetchMode, null, null).thenApplyAsync(ReqlAst::handleAtom); + return conn.runAsync(this, runOpts, fetchMode, false, null).thenApply(ReqlAst::handleAtom); } /** @@ -613,7 +609,7 @@ public CompletableFuture runAtomAsync(Connection conn, OptArgs runOpts, * @return The result of this query */ public CompletableFuture runAtomAsync(Connection conn, OptArgs runOpts, Class typeRef) { - return conn.runAsync(this, runOpts, null, null, Types.of(typeRef)).thenApplyAsync(ReqlAst::handleAtom); + return conn.runAsync(this, runOpts, null, false, Types.of(typeRef)).thenApply(ReqlAst::handleAtom); } /** @@ -627,7 +623,7 @@ public CompletableFuture runAtomAsync(Connection conn, OptArgs runOpts, C * @return The result of this query */ public CompletableFuture runAtomAsync(Connection conn, OptArgs runOpts, TypeReference typeRef) { - return conn.runAsync(this, runOpts, null, null, typeRef).thenApplyAsync(ReqlAst::handleAtom); + return conn.runAsync(this, runOpts, null, false, typeRef).thenApply(ReqlAst::handleAtom); } /** @@ -641,7 +637,7 @@ public CompletableFuture runAtomAsync(Connection conn, OptArgs runOpts, T * @return The result of this query */ public CompletableFuture runAtomAsync(Connection conn, Result.FetchMode fetchMode, Class typeRef) { - return conn.runAsync(this, new OptArgs(), fetchMode, null, Types.of(typeRef)).thenApplyAsync(ReqlAst::handleAtom); + return conn.runAsync(this, new OptArgs(), fetchMode, false, Types.of(typeRef)).thenApply(ReqlAst::handleAtom); } /** @@ -655,7 +651,7 @@ public CompletableFuture runAtomAsync(Connection conn, Result.FetchMode f * @return The result of this query */ public CompletableFuture runAtomAsync(Connection conn, Result.FetchMode fetchMode, TypeReference typeRef) { - return conn.runAsync(this, new OptArgs(), fetchMode, null, typeRef).thenApplyAsync(ReqlAst::handleAtom); + return conn.runAsync(this, new OptArgs(), fetchMode, false, typeRef).thenApply(ReqlAst::handleAtom); } /** @@ -670,7 +666,7 @@ public CompletableFuture runAtomAsync(Connection conn, Result.FetchMode f * @return The result of this query */ public CompletableFuture runAtomAsync(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode, Class typeRef) { - return conn.runAsync(this, runOpts, fetchMode, null, Types.of(typeRef)).thenApplyAsync(ReqlAst::handleAtom); + return conn.runAsync(this, runOpts, fetchMode, false, Types.of(typeRef)).thenApply(ReqlAst::handleAtom); } /** @@ -685,7 +681,7 @@ public CompletableFuture runAtomAsync(Connection conn, OptArgs runOpts, R * @return The result of this query */ public CompletableFuture runAtomAsync(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode, TypeReference typeRef) { - return conn.runAsync(this, runOpts, fetchMode, null, typeRef).thenApplyAsync(ReqlAst::handleAtom); + return conn.runAsync(this, runOpts, fetchMode, false, typeRef).thenApply(ReqlAst::handleAtom); } /** @@ -1016,7 +1012,7 @@ public Map> runGrouping(Connection conn, OptArgs runOpts, Resul * @return The result of this query */ public CompletableFuture>> runGroupingAsync(Connection conn, Class keyRef, Class valueRef) { - return conn.runAsync(this, new OptArgs(), null, true, Types.groupOf(keyRef, valueRef)).thenApplyAsync(ReqlAst::handleGrouping); + return conn.runAsync(this, new OptArgs(), null, true, Types.groupOf(keyRef, valueRef)).thenApply(ReqlAst::handleGrouping); } /** @@ -1031,7 +1027,7 @@ public CompletableFuture>> runGroupingAsync(Connection conn * @return The result of this query */ public CompletableFuture>> runGroupingAsync(Connection conn, TypeReference keyRef, Class valueRef) { - return conn.runAsync(this, new OptArgs(), null, true, Types.groupOf(keyRef, valueRef)).thenApplyAsync(ReqlAst::handleGrouping); + return conn.runAsync(this, new OptArgs(), null, true, Types.groupOf(keyRef, valueRef)).thenApply(ReqlAst::handleGrouping); } /** @@ -1046,7 +1042,7 @@ public CompletableFuture>> runGroupingAsync(Connection conn * @return The result of this query (either a {@code P or a Cursor

} */ public CompletableFuture>> runGroupingAsync(Connection conn, Class keyRef, TypeReference valueRef) { - return conn.runAsync(this, new OptArgs(), null, true, Types.groupOf(keyRef, valueRef)).thenApplyAsync(ReqlAst::handleGrouping); + return conn.runAsync(this, new OptArgs(), null, true, Types.groupOf(keyRef, valueRef)).thenApply(ReqlAst::handleGrouping); } /** @@ -1061,7 +1057,7 @@ public CompletableFuture>> runGroupingAsync(Connection conn * @return The result of this query (either a {@code P or a Cursor

} */ public CompletableFuture>> runGroupingAsync(Connection conn, TypeReference keyRef, TypeReference valueRef) { - return conn.runAsync(this, new OptArgs(), null, true, Types.groupOf(keyRef, valueRef)).thenApplyAsync(ReqlAst::handleGrouping); + return conn.runAsync(this, new OptArgs(), null, true, Types.groupOf(keyRef, valueRef)).thenApply(ReqlAst::handleGrouping); } /** @@ -1075,7 +1071,7 @@ public CompletableFuture>> runGroupingAsync(Connection conn * @return The result of this query (either a {@code P or a Cursor

} */ public CompletableFuture>> runGroupingAsync(Connection conn, TypeReference> typeRef) { - return conn.runAsync(this, new OptArgs(), null, true, typeRef).thenApplyAsync(ReqlAst::handleGrouping); + return conn.runAsync(this, new OptArgs(), null, true, typeRef).thenApply(ReqlAst::handleGrouping); } /** @@ -1091,7 +1087,7 @@ public CompletableFuture>> runGroupingAsync(Connection conn * @return The result of this query */ public CompletableFuture>> runGroupingAsync(Connection conn, OptArgs runOpts, Class keyRef, Class valueRef) { - return conn.runAsync(this, runOpts, null, true, Types.groupOf(keyRef, valueRef)).thenApplyAsync(ReqlAst::handleGrouping); + return conn.runAsync(this, runOpts, null, true, Types.groupOf(keyRef, valueRef)).thenApply(ReqlAst::handleGrouping); } /** @@ -1107,7 +1103,7 @@ public CompletableFuture>> runGroupingAsync(Connection conn * @return The result of this query */ public CompletableFuture>> runGroupingAsync(Connection conn, OptArgs runOpts, TypeReference keyRef, Class valueRef) { - return conn.runAsync(this, runOpts, null, true, Types.groupOf(keyRef, valueRef)).thenApplyAsync(ReqlAst::handleGrouping); + return conn.runAsync(this, runOpts, null, true, Types.groupOf(keyRef, valueRef)).thenApply(ReqlAst::handleGrouping); } /** @@ -1123,7 +1119,7 @@ public CompletableFuture>> runGroupingAsync(Connection conn * @return The result of this query */ public CompletableFuture>> runGroupingAsync(Connection conn, OptArgs runOpts, Class keyRef, TypeReference valueRef) { - return conn.runAsync(this, runOpts, null, true, Types.groupOf(keyRef, valueRef)).thenApplyAsync(ReqlAst::handleGrouping); + return conn.runAsync(this, runOpts, null, true, Types.groupOf(keyRef, valueRef)).thenApply(ReqlAst::handleGrouping); } /** @@ -1139,7 +1135,7 @@ public CompletableFuture>> runGroupingAsync(Connection conn * @return The result of this query */ public CompletableFuture>> runGroupingAsync(Connection conn, OptArgs runOpts, TypeReference keyRef, TypeReference valueRef) { - return conn.runAsync(this, runOpts, null, true, Types.groupOf(keyRef, valueRef)).thenApplyAsync(ReqlAst::handleGrouping); + return conn.runAsync(this, runOpts, null, true, Types.groupOf(keyRef, valueRef)).thenApply(ReqlAst::handleGrouping); } /** @@ -1154,7 +1150,7 @@ public CompletableFuture>> runGroupingAsync(Connection conn * @return The result of this query */ public CompletableFuture>> runGroupingAsync(Connection conn, OptArgs runOpts, TypeReference> typeRef) { - return conn.runAsync(this, runOpts, null, true, typeRef).thenApplyAsync(ReqlAst::handleGrouping); + return conn.runAsync(this, runOpts, null, true, typeRef).thenApply(ReqlAst::handleGrouping); } /** @@ -1170,7 +1166,7 @@ public CompletableFuture>> runGroupingAsync(Connection conn * @return The result of this query */ public CompletableFuture>> runGroupingAsync(Connection conn, Result.FetchMode fetchMode, Class keyRef, Class valueRef) { - return conn.runAsync(this, new OptArgs(), fetchMode, true, Types.groupOf(keyRef, valueRef)).thenApplyAsync(ReqlAst::handleGrouping); + return conn.runAsync(this, new OptArgs(), fetchMode, true, Types.groupOf(keyRef, valueRef)).thenApply(ReqlAst::handleGrouping); } /** @@ -1186,7 +1182,7 @@ public CompletableFuture>> runGroupingAsync(Connection conn * @return The result of this query */ public CompletableFuture>> runGroupingAsync(Connection conn, Result.FetchMode fetchMode, TypeReference keyRef, Class valueRef) { - return conn.runAsync(this, new OptArgs(), fetchMode, true, Types.groupOf(keyRef, valueRef)).thenApplyAsync(ReqlAst::handleGrouping); + return conn.runAsync(this, new OptArgs(), fetchMode, true, Types.groupOf(keyRef, valueRef)).thenApply(ReqlAst::handleGrouping); } /** @@ -1202,7 +1198,7 @@ public CompletableFuture>> runGroupingAsync(Connection conn * @return The result of this query */ public CompletableFuture>> runGroupingAsync(Connection conn, Result.FetchMode fetchMode, Class keyRef, TypeReference valueRef) { - return conn.runAsync(this, new OptArgs(), fetchMode, true, Types.groupOf(keyRef, valueRef)).thenApplyAsync(ReqlAst::handleGrouping); + return conn.runAsync(this, new OptArgs(), fetchMode, true, Types.groupOf(keyRef, valueRef)).thenApply(ReqlAst::handleGrouping); } /** @@ -1218,7 +1214,7 @@ public CompletableFuture>> runGroupingAsync(Connection conn * @return The result of this query */ public CompletableFuture>> runGroupingAsync(Connection conn, Result.FetchMode fetchMode, TypeReference keyRef, TypeReference valueRef) { - return conn.runAsync(this, new OptArgs(), fetchMode, true, Types.groupOf(keyRef, valueRef)).thenApplyAsync(ReqlAst::handleGrouping); + return conn.runAsync(this, new OptArgs(), fetchMode, true, Types.groupOf(keyRef, valueRef)).thenApply(ReqlAst::handleGrouping); } /** @@ -1233,7 +1229,7 @@ public CompletableFuture>> runGroupingAsync(Connection conn * @return The result of this query */ public CompletableFuture>> runGroupingAsync(Connection conn, Result.FetchMode fetchMode, TypeReference> typeRef) { - return conn.runAsync(this, new OptArgs(), fetchMode, true, typeRef).thenApplyAsync(ReqlAst::handleGrouping); + return conn.runAsync(this, new OptArgs(), fetchMode, true, typeRef).thenApply(ReqlAst::handleGrouping); } /** @@ -1250,7 +1246,7 @@ public CompletableFuture>> runGroupingAsync(Connection conn * @return The result of this query */ public CompletableFuture>> runGroupingAsync(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode, Class keyRef, Class valueRef) { - return conn.runAsync(this, runOpts, fetchMode, true, Types.groupOf(keyRef, valueRef)).thenApplyAsync(ReqlAst::handleGrouping); + return conn.runAsync(this, runOpts, fetchMode, true, Types.groupOf(keyRef, valueRef)).thenApply(ReqlAst::handleGrouping); } /** @@ -1267,7 +1263,7 @@ public CompletableFuture>> runGroupingAsync(Connection conn * @return The result of this query */ public CompletableFuture>> runGroupingAsync(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode, TypeReference keyRef, Class valueRef) { - return conn.runAsync(this, runOpts, fetchMode, true, Types.groupOf(keyRef, valueRef)).thenApplyAsync(ReqlAst::handleGrouping); + return conn.runAsync(this, runOpts, fetchMode, true, Types.groupOf(keyRef, valueRef)).thenApply(ReqlAst::handleGrouping); } /** @@ -1284,7 +1280,7 @@ public CompletableFuture>> runGroupingAsync(Connection conn * @return The result of this query */ public CompletableFuture>> runGroupingAsync(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode, Class keyRef, TypeReference valueRef) { - return conn.runAsync(this, runOpts, fetchMode, true, Types.groupOf(keyRef, valueRef)).thenApplyAsync(ReqlAst::handleGrouping); + return conn.runAsync(this, runOpts, fetchMode, true, Types.groupOf(keyRef, valueRef)).thenApply(ReqlAst::handleGrouping); } /** @@ -1301,7 +1297,7 @@ public CompletableFuture>> runGroupingAsync(Connection conn * @return The result of this query */ public CompletableFuture>> runGroupingAsync(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode, TypeReference keyRef, TypeReference valueRef) { - return conn.runAsync(this, runOpts, fetchMode, true, Types.groupOf(keyRef, valueRef)).thenApplyAsync(ReqlAst::handleGrouping); + return conn.runAsync(this, runOpts, fetchMode, true, Types.groupOf(keyRef, valueRef)).thenApply(ReqlAst::handleGrouping); } /** @@ -1317,7 +1313,7 @@ public CompletableFuture>> runGroupingAsync(Connection conn * @return The result of this query */ public CompletableFuture>> runGroupingAsync(Connection conn, OptArgs runOpts, Result.FetchMode fetchMode, TypeReference> typeRef) { - return conn.runAsync(this, runOpts, fetchMode, true, typeRef).thenApplyAsync(ReqlAst::handleGrouping); + return conn.runAsync(this, runOpts, fetchMode, true, typeRef).thenApply(ReqlAst::handleGrouping); } /** @@ -1726,15 +1722,17 @@ private void astToString(StringBuilder builder, String name, String indent, bool builder.append('(').append(length).append(length != 1 ? " bytes" : " byte").append(")"); } builder.append('\n'); - Iterator argsIterator = args.iterator(); - while (argsIterator.hasNext()) { - ReqlAst arg = argsIterator.next(); - arg.astToString(builder, null, - indent + (tail ? " " : "│ "), - !argsIterator.hasNext() && optargs.isEmpty()); + if (args != null) { + Iterator argsIterator = args.iterator(); + while (argsIterator.hasNext()) { + ReqlAst arg = argsIterator.next(); + arg.astToString(builder, null, + indent + (tail ? " " : "│ "), + !argsIterator.hasNext() && (optargs == null || optargs.isEmpty())); + } } - if (!optargs.isEmpty()) { + if (optargs != null && !optargs.isEmpty()) { builder.append(indent).append(tail ? " " : "│ ").append("└── ").append(": \n"); Iterator> optIterator = optargs.entrySet().iterator(); while (optIterator.hasNext()) { @@ -1746,6 +1744,12 @@ private void astToString(StringBuilder builder, String name, String indent, bool } } + static Map buildToMap(OptArgs opts) { + Map result = new LinkedHashMap<>(opts.size()); + opts.forEach((name, arg) -> result.put(name, arg.build())); + return result; + } + private static T handleAtom(Result result) { if (!result.responseType().equals(ResponseType.SUCCESS_ATOM)) { throw new IllegalStateException("result is not an atom."); diff --git a/src/main/java/com/rethinkdb/gen/model/TopLevel.java b/src/main/java/com/rethinkdb/gen/model/TopLevel.java index eb3ffb25..f12f6f1c 100644 --- a/src/main/java/com/rethinkdb/gen/model/TopLevel.java +++ b/src/main/java/com/rethinkdb/gen/model/TopLevel.java @@ -5,7 +5,6 @@ package com.rethinkdb.gen.model; -import com.rethinkdb.ast.ReqlAst; import com.rethinkdb.model.Arguments; import com.rethinkdb.model.MapObject; import com.rethinkdb.gen.ast.Error; @@ -13,7 +12,6 @@ import com.rethinkdb.gen.exc.ReqlDriverError; import com.rethinkdb.utils.Internals; -import java.util.Arrays; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -29,7 +27,7 @@ public ReqlExpr row(Object... values) { " Use lambda syntax instead"); } - public static Object pathspec(Object... path) { + public Object pathspec(Object... path) { if (path.length < 2) { throw new ReqlDriverError("r.pathspec(...) requires at least two parameters."); } diff --git a/src/main/java/com/rethinkdb/model/OptArgs.java b/src/main/java/com/rethinkdb/model/OptArgs.java index d2d4535e..05175080 100644 --- a/src/main/java/com/rethinkdb/model/OptArgs.java +++ b/src/main/java/com/rethinkdb/model/OptArgs.java @@ -23,9 +23,12 @@ public OptArgs with(String key, List value) { } public static OptArgs fromMap(Map map) { - OptArgs oa = new OptArgs(); - oa.putAll(map); - return oa; + if (map == null) { + return new OptArgs(); + } + OptArgs args = new OptArgs(); + args.putAll(map); + return args; } public static OptArgs of(String key, Object val) { diff --git a/src/main/java/com/rethinkdb/net/Connection.java b/src/main/java/com/rethinkdb/net/Connection.java index 926ad85e..3a4d5035 100644 --- a/src/main/java/com/rethinkdb/net/Connection.java +++ b/src/main/java/com/rethinkdb/net/Connection.java @@ -6,10 +6,11 @@ import com.rethinkdb.gen.ast.Db; import com.rethinkdb.gen.exc.ReqlDriverError; import com.rethinkdb.gen.exc.ReqlError; +import com.rethinkdb.gen.model.TopLevel; import com.rethinkdb.gen.proto.ResponseType; -import com.rethinkdb.model.Arguments; import com.rethinkdb.model.OptArgs; import com.rethinkdb.model.Server; +import com.rethinkdb.net.Result.FetchMode; import com.rethinkdb.utils.Internals; import com.rethinkdb.utils.Types; import org.jetbrains.annotations.NotNull; @@ -18,16 +19,22 @@ import org.slf4j.LoggerFactory; import javax.net.ssl.SSLContext; -import java.io.*; +import java.io.Closeable; +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; import java.net.URI; +import java.util.List; import java.util.Objects; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; /** * A single connection to RethinkDB. @@ -35,47 +42,48 @@ * This object is thread-safe. */ public class Connection implements Closeable { - private static final Logger LOGGER = LoggerFactory.getLogger(Connection.class); + private static final @NotNull Logger LOGGER = LoggerFactory.getLogger(Connection.class); - protected final ConnectionSocket.Factory socketFactory; - protected final ResponsePump.Factory pumpFactory; - protected final String hostname; + protected final @NotNull String hostname; protected final int port; - protected final @Nullable SSLContext sslContext; - protected final @Nullable Long timeout; protected final @Nullable String user; protected final @Nullable String password; - protected final Result.FetchMode defaultFetchMode; + protected final @Nullable Long timeout; + protected final @Nullable SSLContext sslContext; + //java-only + protected final @NotNull ConnectionSocket.Factory socketFactory; + protected final @NotNull ResponsePump.Factory pumpFactory; + protected final @NotNull FetchMode defaultFetchMode; + protected final boolean unwrapLists; + protected final boolean persistentThreads; - protected final AtomicLong nextToken = new AtomicLong(); - protected final Set> tracked = ConcurrentHashMap.newKeySet(); - protected final Lock writeLock = new ReentrantLock(); + protected final @NotNull AtomicLong nextToken = new AtomicLong(); + protected final @NotNull Set> tracked = ConcurrentHashMap.newKeySet(); + protected final @NotNull Lock writeLock = new ReentrantLock(); protected @Nullable String dbname; protected @Nullable ConnectionSocket socket; protected @Nullable ResponsePump pump; - protected boolean unwrapLists; /** * Creates a new connection based on a {@link Builder}. * * @param b the connection builder */ - public Connection(Builder b) { - if (b.authKey != null && b.user != null) { - throw new ReqlDriverError("Either `authKey` or `user` can be used, but not both."); - } - this.socketFactory = b.socketFactory != null ? b.socketFactory : DefaultConnectionFactory.INSTANCE; - this.pumpFactory = b.pumpFactory != null ? b.pumpFactory : DefaultConnectionFactory.INSTANCE; + public Connection(@NotNull Builder b) { this.hostname = b.hostname != null ? b.hostname : "127.0.0.1"; this.port = b.port != null ? b.port : 28015; + this.user = b.user != null ? b.user : "admin"; + this.password = b.password != null ? b.password : ""; this.dbname = b.dbname; - this.sslContext = b.sslContext; this.timeout = b.timeout; - this.user = b.user != null ? b.user : "admin"; - this.password = b.password != null ? b.password : b.authKey != null ? b.authKey : ""; + this.sslContext = b.sslContext; + //java-only + this.socketFactory = b.socketFactory != null ? b.socketFactory : DefaultConnectionFactory.INSTANCE; + this.pumpFactory = b.pumpFactory != null ? b.pumpFactory : DefaultConnectionFactory.INSTANCE; this.unwrapLists = b.unwrapLists; - this.defaultFetchMode = b.defaultFetchMode != null ? b.defaultFetchMode : Result.FetchMode.LAZY; + this.defaultFetchMode = b.defaultFetchMode != null ? b.defaultFetchMode : FetchMode.LAZY; + this.persistentThreads = b.persistentThreads; } /** @@ -100,11 +108,6 @@ public Connection(Builder b) { return this; } - public @NotNull Connection unwrapLists(boolean val) { - unwrapLists = val; - return this; - } - /** * Checks if the connection is open. * @@ -114,20 +117,57 @@ public boolean isOpen() { return socket != null && socket.isOpen() && pump != null && pump.isAlive(); } + /** + * Begins the socket connection to the server asynchronously. + * + * @return a {@link CompletableFuture} which completes with itself, once connected. + */ + public @NotNull CompletableFuture connectAsync() { + if (socket != null) { + throw new ReqlDriverError("Client already connected!"); + } + return createSocketAsync().thenApply(socket -> { + this.socket = socket; + HandshakeProtocol.doHandshake(socket, user, password, timeout); + this.pump = pumpFactory.newPump(socket, !persistentThreads); + return this; + }); + } + /** * Begins the socket connection to the server. * * @return itself, once connected. */ public @NotNull Connection connect() { - if (socket != null) { - throw new ReqlDriverError("Client already connected!"); + try { + return connectAsync().join(); + } catch (CompletionException ce) { + Throwable t = ce.getCause(); + if (t instanceof ReqlError) { + throw ((ReqlError) t); + } + throw new ReqlDriverError(t); } - ConnectionSocket socket = socketFactory.newSocket(hostname, port, sslContext, timeout); - this.socket = socket; - HandshakeProtocol.doHandshake(socket, user, password, timeout); - pump = pumpFactory.newPump(socket); - return this; + } + + /** + * Closes and reconnects to the server. + * + * @return a {@link CompletableFuture} which completes with itself, once reconnected. + */ + public @NotNull CompletableFuture reconnectAsync() { + return reconnectAsync(true); + } + + /** + * Closes and reconnects to the server asynchronously. + * + * @param noreplyWait if closing should send a {@link Connection#noreplyWait()} before closing. + * @return a {@link CompletableFuture} which completes with itself, once reconnected. + */ + public @NotNull CompletableFuture reconnectAsync(boolean noreplyWait) { + return closeAsync(noreplyWait).thenCompose(v -> connectAsync()); } /** @@ -146,9 +186,15 @@ public boolean isOpen() { * @return itself, once reconnected. */ public @NotNull Connection reconnect(boolean noreplyWait) { - close(noreplyWait); - connect(); - return this; + try { + return reconnectAsync(noreplyWait).join(); + } catch (CompletionException ce) { + Throwable t = ce.getCause(); + if (t instanceof ReqlError) { + throw ((ReqlError) t); + } + throw new ReqlDriverError(t); + } } /** @@ -165,7 +211,7 @@ public boolean isOpen() { */ public @NotNull CompletableFuture> runAsync(@NotNull ReqlAst term, @NotNull OptArgs optArgs, - @Nullable Result.FetchMode fetchMode, + @Nullable FetchMode fetchMode, @Nullable Boolean unwrap, @Nullable TypeReference typeRef) { handleOptArgs(optArgs); @@ -184,13 +230,13 @@ public boolean isOpen() { * @param term The ReQL term * @param optArgs The options to run this query with * @param fetchMode The fetch mode to use in partial sequences - * @param unwrap + * @param unwrap Override for the connection's unwrapLists setting * @param typeRef The type to convert to * @return The result of this query */ public @NotNull Result run(@NotNull ReqlAst term, @NotNull OptArgs optArgs, - @Nullable Result.FetchMode fetchMode, + @Nullable FetchMode fetchMode, @Nullable Boolean unwrap, @Nullable TypeReference typeRef) { try { @@ -271,6 +317,25 @@ public void runNoReply(@NotNull ReqlAst term, @NotNull OptArgs optArgs) { runQueryNoreply(Query.createStart(nextToken.incrementAndGet(), term, optArgs)); } + /** + * Closes this connection asynchronously, closing all {@link Result}s, the {@link ResponsePump} and the {@link ConnectionSocket}. + * + * @return a {@link CompletableFuture} which completes when everything is closed. + */ + public @NotNull CompletableFuture closeAsync() { + return closeAsync(true); + } + + /** + * Closes this connection asynchronously, closing all {@link Result}s, the {@link ResponsePump} and the {@link ConnectionSocket}. + * + * @param shouldNoreplyWait If the connection should noreply_wait before closing + * @return a {@link CompletableFuture} which completes when everything is closed. + */ + public @NotNull CompletableFuture closeAsync(boolean shouldNoreplyWait) { + return CompletableFuture.runAsync(() -> this.close(shouldNoreplyWait)); + } + /** * Closes this connection, closing all {@link Result}s, the {@link ResponsePump} and the {@link ConnectionSocket}. */ @@ -322,6 +387,11 @@ public void closeResults() { // protected methods + /** + * Sends a STOP query. Used by {@link Result} partial sequences. + * + * @param token the response token. + */ protected void sendStop(long token) { // While the server does reply to the stop request, we ignore that reply. // This works because the response pump in `connect` ignores replies for which @@ -329,16 +399,33 @@ protected void sendStop(long token) { runQueryNoreply(Query.createStop(token)); } - + /** + * Sends a CONTINUE query. Used by {@link Result} partial sequences. + * + * @param token the response token. + * @return a completable future which completes with the next response. + */ protected @NotNull CompletableFuture sendContinue(long token) { return sendQuery(Query.createContinue(token)); } - protected void loseTrackOf(@NotNull Result r) { + /** + * Callback method from {@link Result}, signals that this connection should keep track of this result. + * Tracked results can be closed using {@link Connection#close()} or {@link Connection#closeResults()}. + * + * @param r the result to be tracked. + */ + protected void keepTrackOf(@NotNull Result r) { tracked.add(r); } - protected void keepTrackOf(@NotNull Result r) { + /** + * Callback method from {@link Result}, signals that this connection should no long keep track of this result. + * The result probably finished or was closed. + * + * @param r the result to be tracked. + */ + protected void loseTrackOf(@NotNull Result r) { tracked.remove(r); } @@ -390,8 +477,20 @@ protected void runQueryNoreply(@NotNull Query query) { } } + /** + * Sents a query to the server and returns a {@link CompletableFuture} which completes with a result. + * Runs a ReQL query with options {@code optArgs}, the specified {@code fetchMode} and returns the result, with the + * values converted to the type of {@code TypeReference} + * + * @param The type of result + * @param query The query + * @param fetchMode The fetch mode to use in partial sequences + * @param unwrap Override for the connection's unwrapLists setting + * @param typeRef The type to convert to + * @return The {@link CompletableFuture} which completes with the result of the query. + */ protected @NotNull CompletableFuture> runQuery(@NotNull Query query, - @Nullable Result.FetchMode fetchMode, + @Nullable FetchMode fetchMode, @Nullable Boolean unwrap, @Nullable TypeReference typeRef) { return sendQuery(query).thenApply(res -> new Result<>( @@ -401,16 +500,32 @@ protected void runQueryNoreply(@NotNull Query query) { typeRef)); } + /** + * Handle optArgs before sending to the server. + * + * @param optArgs the optArgs. + */ protected void handleOptArgs(@NotNull OptArgs optArgs) { - if (!optArgs.containsKey("db") && dbname != null) { + if (optArgs.containsKey("db")) { + // The db arg must be wrapped in a db ast object + optArgs.with("db", new Db(optArgs.get("db"))); + } else if (dbname != null) { // Only override the db global arg if the user hasn't // specified one already and one is specified on the connection - optArgs.with("db", dbname); + optArgs.with("db", new Db(dbname)); } - if (optArgs.containsKey("db")) { - // The db arg must be wrapped in a db ast object - optArgs.with("db", new Db(Arguments.make(optArgs.get("db")))); + } + + /** + * Detects if the connection socket supports async creation or wraps it before returning. + * + * @return a {@link CompletableFuture} which will complete with a new {@link ConnectionSocket}. + */ + protected @NotNull CompletableFuture createSocketAsync() { + if (socketFactory instanceof ConnectionSocket.AsyncFactory) { + return ((ConnectionSocket.AsyncFactory) socketFactory).newSocketAsync(hostname, port, sslContext, timeout); } + return CompletableFuture.supplyAsync(() -> socketFactory.newSocket(hostname, port, sslContext, timeout)); } // builder @@ -419,22 +534,31 @@ protected void handleOptArgs(@NotNull OptArgs optArgs) { * Builder should be used to build a Connection instance. */ public static class Builder { - private @Nullable ConnectionSocket.Factory socketFactory; - private @Nullable ResponsePump.Factory pumpFactory; private @Nullable String hostname; private @Nullable Integer port; - private @Nullable String dbname; - private @Nullable SSLContext sslContext; - private @Nullable Long timeout; - private @Nullable String authKey; private @Nullable String user; private @Nullable String password; - private @Nullable Result.FetchMode defaultFetchMode; + private @Nullable String dbname; + private @Nullable Long timeout; + private @Nullable SSLContext sslContext; + // java-only + private @Nullable ConnectionSocket.Factory socketFactory; + private @Nullable ResponsePump.Factory pumpFactory; + private @Nullable FetchMode defaultFetchMode; private boolean unwrapLists = false; + private boolean persistentThreads = false; + /** + * Creates an empty builder. + */ public Builder() { } + /** + * Parses a db-ul as a builder. + * + * @param uri the db-url to parse. + */ public Builder(@NotNull URI uri) { Objects.requireNonNull(uri, "URI can't be null. Use the default constructor instead."); if (!"rethinkdb".equals(uri.getScheme())) { @@ -465,7 +589,7 @@ public Builder(@NotNull URI uri) { if (port != -1) { this.port = port; } - if (path != null) { + if (path != null && !path.isEmpty()) { if (path.charAt(0) == '/') { path = path.substring(1); } @@ -479,32 +603,25 @@ public Builder(@NotNull URI uri) { int i = kv.indexOf('='); String k = i != -1 ? kv.substring(0, i) : kv; String v = i != -1 ? kv.substring(i + 1) : ""; + boolean booleanValue = v.isEmpty() || "true".equals(v) || "enabled".equals(v); switch (k) { - case "auth_key": - case "authKey": { - String authKey = v; - if (authKey.isEmpty()) { - LOGGER.debug("Ignoring empty '{}'", v); - break; - } - if (authKey.charAt(0) == '\'' && authKey.charAt(authKey.length() - 1) == '\'') { - authKey = authKey.substring(1, authKey.length() - 1).replace("\\'", "'"); - } - this.authKey = authKey; - break; - } case "timeout": { this.timeout = Long.parseLong(v); break; } case "java.default_fetch_mode": case "java.defaultFetchMode": { - this.defaultFetchMode = Result.FetchMode.fromString(v); + this.defaultFetchMode = FetchMode.fromString(v); break; } case "java.unwrap_lists": case "java.unwrapLists": { - this.unwrapLists = v.isEmpty() || "true".equals(v) || "enabled".equals(v); + this.unwrapLists = booleanValue; + break; + } + case "java.persistent_threads": + case "java.persistentThreads": { + this.persistentThreads = booleanValue; break; } default: { @@ -515,103 +632,315 @@ public Builder(@NotNull URI uri) { } } + /** + * Creates a copy of this builder. + * + * @return a copy of this builder. + * @deprecated Use {@link com.rethinkdb.RethinkDB#connection(Builder) r.connection(Builder)} instead. + * (Will be removed on v2.5.0) + */ + @Deprecated public @NotNull Builder copyOf() { - Builder c = new Builder(); - c.socketFactory = socketFactory; - c.pumpFactory = pumpFactory; - c.hostname = hostname; - c.port = port; - c.dbname = dbname; - c.sslContext = sslContext; - c.timeout = timeout; - c.authKey = authKey; - c.user = user; - c.password = password; - c.unwrapLists = unwrapLists; - c.defaultFetchMode = defaultFetchMode; - return c; - } - - public @NotNull Builder socketFactory(@Nullable ConnectionSocket.Factory factory) { - socketFactory = factory; - return this; + return new Builder(this); + } + + /** + * Copies a connection builder. + * + * @param b the original builder. + */ + public Builder(@NotNull Builder b) { + hostname = b.hostname; + port = b.port; + user = b.user; + password = b.password; + dbname = b.dbname; + timeout = b.timeout; + sslContext = b.sslContext; + // java-only + socketFactory = b.socketFactory; + pumpFactory = b.pumpFactory; + unwrapLists = b.unwrapLists; + defaultFetchMode = b.defaultFetchMode; + persistentThreads = b.persistentThreads; } - public @NotNull Builder pumpFactory(@Nullable ResponsePump.Factory factory) { - pumpFactory = factory; + /** + * Sets a custom hostname for the connection. + *

(Configurable by Db-url)

+ * + * @param hostname the hostname, or {@code null}. + * @return itself. + */ + public @NotNull Builder hostname(@Nullable String hostname) { + this.hostname = hostname; return this; } - public @NotNull Builder hostname(@Nullable String val) { - hostname = val; + /** + * Sets a custom port for the connection. + *

(Configurable by Db-url)

+ * + * @param port the port, or {@code null}. + * @return itself. + */ + public @NotNull Builder port(@Nullable Integer port) { + this.port = port; return this; } - public @NotNull Builder port(@Nullable Integer val) { - port = val; + /** + * Sets a custom username for the connection. + *

(Configurable by Db-url)

+ * + * @param user the username, or {@code null}. + * @return itself. + */ + public @NotNull Builder user(@Nullable String user) { + this.user = user; return this; } - public @NotNull Builder db(@Nullable String val) { - dbname = val; + /** + * Sets a custom username and password for the connection. + *

(Configurable by Db-url)

+ * + * @param user the username, or {@code null}. + * @param password the password, or {@code null}. + * @return itself. + */ + public @NotNull Builder user(@Nullable String user, @Nullable String password) { + this.user = user; + this.password = password; return this; } - public @NotNull Builder authKey(@Nullable String key) { - authKey = key; + /** + * Sets a custom database for the connection. + *

(Configurable by Db-url)

+ * + * @param dbname the database name, or {@code null}. + * @return itself. + */ + public @NotNull Builder db(@Nullable String dbname) { + this.dbname = dbname; return this; } - public @NotNull Builder user(@Nullable String user, @Nullable String password) { - this.user = user; - this.password = password; + /** + * Sets a custom timeout for the connection. + *

(Db-url key: "timeout")

+ * + * @param timeout the timeout, or {@code null}. + * @return itself. + */ + public @NotNull Builder timeout(@Nullable Long timeout) { + this.timeout = timeout; return this; } - public @NotNull Builder certFile(@NotNull File val) { - try (InputStream stream = new FileInputStream(val)) { - return sslContext(Internals.readCertFile(stream)); - } catch (IOException e) { - throw new RuntimeException(e); - } + /** + * Sets a custom authentication key for the connection. + *

(No db-url support)

+ * + * @param authKey the authentication key, or {@code null}. + * @return itself. + * @deprecated Use {@link Builder#user(String, String)} instead. + * (Will be removed on v2.5.0) + */ + @Deprecated + public @NotNull Builder authKey(@Nullable String authKey) { + return user(null, authKey); } - public @NotNull Builder certFile(@NotNull InputStream val) { - try (InputStream stream = val) { + /** + * Sets a certificate to provide SSL encryption to the RethinkDB Connection. + *

(No db-url support)

+ * + * @param source a callable which provides a {@link InputStream} with the contents of a certificate file. + * @return itself. + */ + public @NotNull Builder certFile(@NotNull Callable source) { + try (InputStream stream = source.call()) { return sslContext(Internals.readCertFile(stream)); - } catch (IOException e) { + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { throw new RuntimeException(e); } } - public @NotNull Builder sslContext(@Nullable SSLContext val) { - sslContext = val; + /** + * Sets a certificate to provide SSL encryption to the RethinkDB Connection. + *

(No db-url support)

+ * + * @param source a {@link InputStream} with the contents of a certificate file. + * @return itself. + */ + public @NotNull Builder certFile(@NotNull InputStream source) { + return certFile(() -> source); + } + + /** + * Sets a certificate to provide SSL encryption to the RethinkDB Connection. + *

(No db-url support)

+ * + * @param file a certificate file to read from. + * @return itself. + */ + public @NotNull Builder certFile(@NotNull File file) { + return certFile(() -> new FileInputStream(file)); + } + + /** + * Sets a {@link SSLContext} to provide SSL encryption to the RethinkDB Connection. + *

(No db-url support)

+ * + * @param sslContext the SSL context, or {@code null}. + * @return itself. + */ + public @NotNull Builder sslContext(@Nullable SSLContext sslContext) { + this.sslContext = sslContext; return this; } - public @NotNull Builder unwrapLists(boolean val) { - unwrapLists = val; + /** + * Sets a custom {@link ConnectionSocket} factory for the connection. + *

(Java Driver-specific, No db-url support)

+ * + * @param socketFactory the connection socket factory, or {@code null}. + * @return itself. + */ + public @NotNull Builder socketFactory(@Nullable ConnectionSocket.Factory socketFactory) { + this.socketFactory = socketFactory; return this; } - public @NotNull Builder defaultFetchMode(@Nullable Result.FetchMode val) { - defaultFetchMode = val; + /** + * Sets a custom {@link ResponsePump} factory for the connection. + *

(Java Driver-specific, No db-url support)

+ * + * @param pumpFactory the response pump factory, or {@code null}. + * @return itself. + */ + public @NotNull Builder pumpFactory(@Nullable ResponsePump.Factory pumpFactory) { + this.pumpFactory = pumpFactory; return this; } - public @NotNull Builder timeout(@Nullable Long val) { - timeout = val; + /** + * Sets the default fetch mode for sequences. + * + *
+ * Fetch mode is a Java-driver specific behaviour that allows for fine-tuning on partial sequence fetching. + *

+ * Can be used to balance between high availability and network optimization. The + * {@linkplain FetchMode#AGGRESSIVE aggressive} fetch mode will make best effort to consume the entire sequence, as + * fast as possible, to ensure high availability to the consumer, while the {@linkplain FetchMode#LAZY lazy} + * fetch mode will make no effort and await until all objects were consumed before fetching the next one.
+ * In addiction, there are many preemptive fetch modes, which will consume the next sequence once the buffer + * reaches {@linkplain FetchMode#PREEMPTIVE_HALF half}, a {@linkplain FetchMode#PREEMPTIVE_THIRD third}, + * a {@linkplain FetchMode#PREEMPTIVE_FOURTH fourth}, a {@linkplain FetchMode#PREEMPTIVE_FIFTH fitfh}, + * a {@linkplain FetchMode#PREEMPTIVE_SIXTH sixth}, a {@linkplain FetchMode#PREEMPTIVE_SEVENTH seventh} or + * an {@linkplain FetchMode#PREEMPTIVE_EIGHTH eighth} of it's capacity. + *

+ * + *

(Java Driver-specific, Db-url key: "java.default_fetch_mode")

+ * + * @param defaultFetchMode a default fetch mode, or {@code null}. + * @return itself. + */ + public @NotNull Builder defaultFetchMode(@Nullable FetchMode defaultFetchMode) { + this.defaultFetchMode = defaultFetchMode; return this; } + /** + * Sets list unwrapping behaviour for lists. + * + *
+ * List unwrapping is a Java-driver specific behaviour that unwraps an atom response from the server, + * which is a list, as if it were a sequence of objects. + *

+ * Consider the following: + * {@link TopLevel#expr(Object) r.expr}({@link TopLevel#array(Object, Object...) r.array("a", "b", "c")}).{@link com.rethinkdb.gen.ast.ReqlExpr#run(Connection) run(conn)}; + *

+ * By default, it returns a {@link Result} with a single {@link List}["a","b","c"] inside. + *

+ * With list unwrapping, it returns a {@link Result} with "a", "b", "c" inside. + *

+ * The feature makes the code a bit less verbose. For example, iterating goes from: + *

+ * (({@code List}) {@link Result result}{@link Result#single() .single()}){@link List#forEach(Consumer) .forEach(s -> ...)} + *

+ * To: + *

+ * {@link Result result}{@link Result#forEach(Consumer) .forEach(s -> ...)} + *

+ * + *

(Java Driver-specific, Db-url key: "java.unwrap_lists")

+ * + * @param enabled {@code true} to enable list unwrapping, {@code false} to disable. + * @return itself. + */ + public @NotNull Builder unwrapLists(boolean enabled) { + unwrapLists = enabled; + return this; + } + + /** + * Sets if the response pump should use {@linkplain Thread#setDaemon(boolean) daemon threads} or not.
+ *
+ * Using persistent threads guarantees that the JVM will not exit once the main thread finishes, but will keep + * the JVM alive if the connection is not closed.
+ * Daemon threads will ensure that the JVM can exit automatically, but may or may not ignore ongoing + * asynchronous queries. Your milage may vary. (HTTP or other application frameworks may open persistent + * threads by themselves and may keep the JVM alive until the main window or application shuts down.) + *
+ * + *

(Java Driver-specific, Db-url key: "java.default_fetch_mode")

+ * + * @param enabled {@code true} to use persistent threads in the response pump, {@code false} to use daemon threads. + * @return itself. + * @see Thread#setDaemon(boolean) + */ + public @NotNull Builder persistentThreads(boolean enabled) { + persistentThreads = enabled; + return this; + } + + /** + * Creates a new connection and connects asynchronously to the server. + * + * @return a {@link CompletableFuture} which completes with the connection once connected. + */ + public @NotNull CompletableFuture connectAsync() { + return new Connection(this).connectAsync(); + } + + /** + * Creates a new connection and connects to the server. + * + * @return a newly created connection, connected to the server. + */ public @NotNull Connection connect() { return new Connection(this).connect(); } + /** + * Creates a {@link URI} with the db-url representing this connection builder. + * + * @return the db-url as a {@link URI}. + */ public @NotNull URI dbUrl() { return URI.create(dbUrlString()); } + /** + * Creates a db-url representing this connection builder. + * + * @return the db-url. + */ public @NotNull String dbUrlString() { StringBuilder b = new StringBuilder("rethinkdb://"); @@ -634,14 +963,8 @@ public Builder(@NotNull URI uri) { } boolean first = true; - if (authKey != null) { - first = false; - b.append('?'); - - b.append("auth_key=").append(authKey); - } if (timeout != null) { - b.append(first ? '?' : "&"); + b.append('?'); first = false; b.append("timeout=").append(timeout); @@ -655,8 +978,15 @@ public Builder(@NotNull URI uri) { if (unwrapLists) { b.append(first ? '?' : "&"); first = false; + b.append("java.unwrap_lists=true"); } + if (persistentThreads) { + b.append(first ? '?' : "&"); + + first = false; + b.append("java.persistent_threads=true"); + } return b.toString(); } @@ -666,24 +996,37 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Builder builder = (Builder) o; - return unwrapLists == builder.unwrapLists && - Objects.equals(socketFactory, builder.socketFactory) && - Objects.equals(pumpFactory, builder.pumpFactory) && - Objects.equals(hostname, builder.hostname) && + return Objects.equals(hostname, builder.hostname) && Objects.equals(port, builder.port) && - Objects.equals(dbname, builder.dbname) && - Objects.equals(sslContext, builder.sslContext) && - Objects.equals(timeout, builder.timeout) && - Objects.equals(authKey, builder.authKey) && Objects.equals(user, builder.user) && Objects.equals(password, builder.password) && - defaultFetchMode == builder.defaultFetchMode; + Objects.equals(dbname, builder.dbname) && + Objects.equals(timeout, builder.timeout) && + Objects.equals(sslContext, builder.sslContext) && + Objects.equals(socketFactory, builder.socketFactory) && + Objects.equals(pumpFactory, builder.pumpFactory) && + Objects.equals(defaultFetchMode, builder.defaultFetchMode) && + unwrapLists == builder.unwrapLists && + persistentThreads == builder.persistentThreads; } + /** + * {@inheritDoc} + */ @Override public int hashCode() { - return Objects.hash(socketFactory, pumpFactory, hostname, port, dbname, - sslContext, timeout, authKey, user, password, defaultFetchMode, unwrapLists); + return Objects.hash( + hostname, port, user, password, dbname, timeout, sslContext, + socketFactory, pumpFactory, defaultFetchMode, unwrapLists, persistentThreads + ); + } + + /** + * {@inheritDoc} + */ + @Override + public String toString() { + return "Builder{" + dbUrlString() + '}'; } } } diff --git a/src/main/java/com/rethinkdb/net/ConnectionSocket.java b/src/main/java/com/rethinkdb/net/ConnectionSocket.java index 802d911b..ceb0eaa6 100644 --- a/src/main/java/com/rethinkdb/net/ConnectionSocket.java +++ b/src/main/java/com/rethinkdb/net/ConnectionSocket.java @@ -5,9 +5,8 @@ import javax.net.ssl.SSLContext; import java.io.Closeable; -import java.io.IOError; -import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; /** * A connection socket into the server. @@ -19,20 +18,57 @@ public interface ConnectionSocket extends Closeable { interface Factory { /** * Creates a new connection socket into the server. - * @param hostname the hostname - * @param port the post + * + * @param hostname the hostname + * @param port the post * @param sslContext an {@link SSLContext}, if any - * @param timeoutMs a timeout, in milliseconds, if any + * @param timeoutMs a timeout, in milliseconds, if any * @return a new {@link ConnectionSocket}. */ @NotNull ConnectionSocket newSocket(@NotNull String hostname, - int port, - @Nullable SSLContext sslContext, - @Nullable Long timeoutMs); + int port, + @Nullable SSLContext sslContext, + @Nullable Long timeoutMs); + } + + /** + * An asynchronous factory of sockets. + */ + interface AsyncFactory extends Factory { + /** + * Creates a new connection socket into the server. + * + * @param hostname the hostname + * @param port the post + * @param sslContext an {@link SSLContext}, if any + * @param timeoutMs a timeout, in milliseconds, if any + * @return a new {@link ConnectionSocket}. + */ + default @NotNull ConnectionSocket newSocket(@NotNull String hostname, + int port, + @Nullable SSLContext sslContext, + @Nullable Long timeoutMs) { + return newSocketAsync(hostname, port, sslContext, timeoutMs).join(); + } + + /** + * Creates a new connection socket asynchronously into the server. + * + * @param hostname the hostname + * @param port the post + * @param sslContext an {@link SSLContext}, if any + * @param timeoutMs a timeout, in milliseconds, if any + * @return a {@link CompletableFuture} which will complete with a new {@link ConnectionSocket}. + */ + @NotNull CompletableFuture newSocketAsync(@NotNull String hostname, + int port, + @Nullable SSLContext sslContext, + @Nullable Long timeoutMs); } /** * Checks if the connection socket is open. + * * @return true if the connection socket is open, false otherwise. */ boolean isOpen(); @@ -44,12 +80,14 @@ interface Factory { /** * Writes the contents of the buffer into the socket. + * * @param buffer the contents to write. */ void write(@NotNull ByteBuffer buffer); /** * Reads a defined amount of bytes, and wraps it in a {@link ByteBuffer}. + * * @param length the length of bytes to read. * @return a {@link ByteBuffer} with the read contents. */ diff --git a/src/main/java/com/rethinkdb/net/DefaultConnectionFactory.java b/src/main/java/com/rethinkdb/net/DefaultConnectionFactory.java index 32b3b28a..bab9e3ba 100644 --- a/src/main/java/com/rethinkdb/net/DefaultConnectionFactory.java +++ b/src/main/java/com/rethinkdb/net/DefaultConnectionFactory.java @@ -22,22 +22,23 @@ /** * The default {@link ConnectionSocket.Factory} and {@link ResponsePump.Factory} for any default connections. */ -public class DefaultConnectionFactory implements ConnectionSocket.Factory, ResponsePump.Factory { +public class DefaultConnectionFactory implements ConnectionSocket.AsyncFactory, ResponsePump.Factory { public static final DefaultConnectionFactory INSTANCE = new DefaultConnectionFactory(); private DefaultConnectionFactory() { } @Override - public @NotNull ConnectionSocket newSocket(@NotNull String hostname, int port, SSLContext sslContext, Long timeoutMs) { - SocketWrapper s = new SocketWrapper(hostname, port, sslContext, timeoutMs); - s.connect(); - return s; + public @NotNull CompletableFuture newSocketAsync(@NotNull String hostname, + int port, + @Nullable SSLContext sslContext, + @Nullable Long timeoutMs) { + return CompletableFuture.supplyAsync(() -> new SocketWrapper(hostname, port, sslContext, timeoutMs).connect()); } @Override - public @NotNull ResponsePump newPump(@NotNull ConnectionSocket socket) { - return new ThreadResponsePump(socket); + public @NotNull ResponsePump newPump(@NotNull ConnectionSocket socket, boolean daemonThreads) { + return new ThreadResponsePump(socket, daemonThreads); } private static class SocketWrapper implements ConnectionSocket { @@ -62,7 +63,7 @@ private static class SocketWrapper implements ConnectionSocket { this.timeoutMs = timeoutMs; } - void connect() { + SocketWrapper connect() { try { // establish connection final InetSocketAddress addr = new InetSocketAddress(hostname, port); @@ -92,6 +93,7 @@ void connect() { } catch (IOException e) { throw new ReqlDriverError("Connection timed out.", e); } + return this; } @Override @@ -178,7 +180,7 @@ private static class ThreadResponsePump implements ResponsePump { private final Thread thread; private Map> awaiting = new ConcurrentHashMap<>(); - public ThreadResponsePump(ConnectionSocket socket) { + public ThreadResponsePump(ConnectionSocket socket, boolean daemon) { this.thread = new Thread(() -> { // pump responses until interrupted while (true) { @@ -194,17 +196,24 @@ public ThreadResponsePump(ConnectionSocket socket) { // read response and send it to whoever is waiting, if anyone try { - final Response response = Response.readFromSocket(socket); - final CompletableFuture awaiter = awaiting.remove(response.token); - if (awaiter != null) { - awaiter.complete(response); - } + CompletableFuture.supplyAsync(Response.readFromSocket(socket)).handle((response, t) -> { + if (t != null) { + shutdown(t); + } else { + final CompletableFuture awaiter = awaiting.remove(response.token); + if (awaiter != null) { + awaiter.complete(response); + } + } + return null; + }); } catch (Exception e) { shutdown(e); return; } } }, "RethinkDB-" + socket + "-ResponsePump"); + thread.setDaemon(daemon); thread.start(); } @@ -223,12 +232,12 @@ public boolean isAlive() { return thread.isAlive(); } - private void shutdown(Exception e) { + private void shutdown(Throwable t) { Map> awaiting = this.awaiting; this.awaiting = null; thread.interrupt(); if (awaiting != null) { - awaiting.forEach((token, future) -> future.completeExceptionally(e)); + awaiting.forEach((token, future) -> future.completeExceptionally(t)); } } diff --git a/src/main/java/com/rethinkdb/net/Response.java b/src/main/java/com/rethinkdb/net/Response.java index 2f7f3b0d..8875a020 100644 --- a/src/main/java/com/rethinkdb/net/Response.java +++ b/src/main/java/com/rethinkdb/net/Response.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.Supplier; import java.util.stream.Collectors; public class Response { @@ -83,7 +84,7 @@ public String toString() { } @SuppressWarnings("unchecked") - public static Response readFromSocket(ConnectionSocket socket) { + public static Supplier readFromSocket(ConnectionSocket socket) { final ByteBuffer header = socket.read(12); final long token = header.getLong(); final int responseLength = header.getInt(); @@ -100,20 +101,22 @@ public static Response readFromSocket(ConnectionSocket socket) { ); } - Map json = Internals.readJson(buffer); - return new Response( - token, - ResponseType.fromValue(((Long) json.get("t")).intValue()), - (List) json.getOrDefault("r", Collections.emptyList()), - ((List) json.getOrDefault("n", Collections.emptyList())) - .stream() - .map(Long::intValue) - .map(ResponseNote::maybeFromValue) - .filter(Objects::nonNull) - .collect(Collectors.toList()), - Profile.fromList((List) json.get("p")), - Backtrace.fromList((List) json.getOrDefault("b", null)), - json.containsKey("e") ? ErrorType.maybeFromValue(((Long) json.get("e")).intValue()) : null - ); + return () -> { + Map json = Internals.readJson(buffer); + return new Response( + token, + ResponseType.fromValue(((Long) json.get("t")).intValue()), + (List) json.getOrDefault("r", Collections.emptyList()), + ((List) json.getOrDefault("n", Collections.emptyList())) + .stream() + .map(Long::intValue) + .map(ResponseNote::maybeFromValue) + .filter(Objects::nonNull) + .collect(Collectors.toList()), + Profile.fromList((List) json.get("p")), + Backtrace.fromList((List) json.getOrDefault("b", null)), + json.containsKey("e") ? ErrorType.maybeFromValue(((Long) json.get("e")).intValue()) : null + ); + }; } } diff --git a/src/main/java/com/rethinkdb/net/ResponsePump.java b/src/main/java/com/rethinkdb/net/ResponsePump.java index 497b223e..9546eb67 100644 --- a/src/main/java/com/rethinkdb/net/ResponsePump.java +++ b/src/main/java/com/rethinkdb/net/ResponsePump.java @@ -14,14 +14,31 @@ public interface ResponsePump { interface Factory { /** * Creates a new response pump using the provided connection socket. + * * @param socket the {@link ConnectionSocket} to pump response pumps from * @return a new {@link ResponsePump}. + * @deprecated Implement the {@link Factory#newPump(ConnectionSocket, boolean)} method. + * (Will be removed on v2.5.0) */ - @NotNull ResponsePump newPump(@NotNull ConnectionSocket socket); + @Deprecated + default @NotNull ResponsePump newPump(@NotNull ConnectionSocket socket) { + throw new UnsupportedOperationException(); + } + + /** + * Creates a new response pump using the provided connection socket. + * + * @param socket the {@link ConnectionSocket} to pump response pumps from + * @param daemonThreads suggestion for using daemon threads and not blocking the process to exit. + * @return a new {@link ResponsePump}. + */ + @NotNull + ResponsePump newPump(@NotNull ConnectionSocket socket, boolean daemonThreads); } /** * Creates a response awaiter for a query token. + * * @param token the query token * @return a {@link CompletableFuture} that completes with a {@link Response} that matches the token. */ @@ -30,6 +47,7 @@ interface Factory { /** * Checks if the response pump is alive. + * * @return true if the response pump is alive, false otherwise. */ boolean isAlive(); diff --git a/src/main/java/com/rethinkdb/net/Result.java b/src/main/java/com/rethinkdb/net/Result.java index 4f9d0e2b..3d91667b 100644 --- a/src/main/java/com/rethinkdb/net/Result.java +++ b/src/main/java/com/rethinkdb/net/Result.java @@ -119,7 +119,7 @@ public Result(Connection connection, this.firstRes = firstRes; this.fetchMode = fetchMode; this.typeRef = typeRef; - fmt = new Internals.FormatOptions(query.globalOptions); + this.fmt = Internals.parseFormatOptions(query.globalOptions); this.unwrapLists = unwrapLists; currentResponse.set(firstRes); handleFirstResponse(); diff --git a/src/main/java/com/rethinkdb/utils/Internals.java b/src/main/java/com/rethinkdb/utils/Internals.java index b42a9dc6..7fe8c134 100644 --- a/src/main/java/com/rethinkdb/utils/Internals.java +++ b/src/main/java/com/rethinkdb/utils/Internals.java @@ -272,20 +272,34 @@ public static SSLContext readCertFile(@NotNull InputStream certFile) { } } + public static FormatOptions parseFormatOptions(OptArgs args) { + if (args == null) return FormatOptions.DEFAULT; + + Datum time_format = (Datum) args.get("time_format"); + boolean rawTime = time_format != null && "raw".equals(time_format.datum); + + Datum binary_format = (Datum) args.get("binary_format"); + boolean rawBinary = binary_format != null && "raw".equals(binary_format.datum); + + Datum group_format = (Datum) args.get("group_format"); + boolean rawGroups = group_format != null && "raw".equals(group_format.datum); + + if (rawTime || rawBinary || rawGroups) { + return new FormatOptions(rawTime, rawGroups, rawBinary); + } + return FormatOptions.DEFAULT; + } + public static class FormatOptions { + public static final FormatOptions DEFAULT = new FormatOptions(false, false, false); public final boolean rawTime; public final boolean rawGroups; public final boolean rawBinary; - public FormatOptions(OptArgs args) { - Datum time_format = (Datum) args.get("time_format"); - this.rawTime = time_format != null && "raw".equals(time_format.datum); - - Datum binary_format = (Datum) args.get("binary_format"); - this.rawBinary = binary_format != null && "raw".equals(binary_format.datum); - - Datum group_format = (Datum) args.get("group_format"); - this.rawGroups = group_format != null && "raw".equals(group_format.datum); + public FormatOptions(boolean rawTime, boolean rawGroups, boolean rawBinary) { + this.rawTime = rawTime; + this.rawGroups = rawGroups; + this.rawBinary = rawBinary; } } } diff --git a/src/test/java/com/rethinkdb/AuthTest.java b/src/test/java/com/rethinkdb/AuthTest.java index f85ede78..62ee158d 100644 --- a/src/test/java/com/rethinkdb/AuthTest.java +++ b/src/test/java/com/rethinkdb/AuthTest.java @@ -35,14 +35,14 @@ public static void oneTimeTearDown() throws Exception { @Test public void testConnectWithNonAdminUser() throws Exception { - Connection bogusConn = TestingFramework.defaultConnectionBuilder().copyOf() + Connection bogusConn = r.connection(TestingFramework.defaultConnectionBuilder()) .user(bogusUsername, bogusPassword).connect(); bogusConn.close(); } @Test (expected=ReqlDriverError.class) public void testConnectWithBothAuthKeyAndUsername() throws Exception { - Connection bogusConn = TestingFramework.defaultConnectionBuilder().copyOf() + Connection bogusConn = r.connection(TestingFramework.defaultConnectionBuilder()) .user(bogusUsername, bogusPassword).authKey("test").connect(); } } diff --git a/src/test/java/com/rethinkdb/DbUrlTest.java b/src/test/java/com/rethinkdb/DbUrlTest.java index bc71f7d3..bdb016fc 100644 --- a/src/test/java/com/rethinkdb/DbUrlTest.java +++ b/src/test/java/com/rethinkdb/DbUrlTest.java @@ -10,9 +10,12 @@ public class DbUrlTest { public static final RethinkDB r = RethinkDB.r; - private static final String DB_URL_STANDARD = "rethinkdb://bogus_man:bogus_pass@myhost:1234/mydb?auth_key=mykey&timeout=30"; - private static final String DB_URL_NON_STANDARD = "rethinkdb://bogus_man:bogus_pass@myhost:1234/mydb?auth_key=mykey&timeout=30&java.default_fetch_mode=lazy&java.unwrap_lists=true"; - private static final String DB_URL_NON_STANDARD_ALTERNATE = "rethinkdb://bogus_man:bogus_pass@myhost:1234/mydb?authKey=mykey&timeout=30&java.defaultFetchMode=lazy&java.unwrapLists=true"; + private static final String DB_URL_STANDARD = + "rethinkdb://bogus_man:bogus_pass@myhost:1234/mydb?timeout=30"; + private static final String DB_URL_NON_STANDARD = + "rethinkdb://bogus_man:bogus_pass@myhost:1234/mydb?timeout=30&java.default_fetch_mode=lazy&java.unwrap_lists=true&java.persistent_threads=true"; + private static final String DB_URL_NON_STANDARD_ALTERNATE = + "rethinkdb://bogus_man:bogus_pass@myhost:1234/mydb?timeout=30&java.defaultFetchMode=lazy&java.unwrapLists=enabled&java.persistentThreads=enabled"; @Test public void testStandardDbUrl() { @@ -20,13 +23,24 @@ public void testStandardDbUrl() { assertEquals(URI.create(DB_URL_STANDARD), r.connection(DB_URL_STANDARD).dbUrl()); assertEquals( r.connection(DB_URL_STANDARD), - r.connection().user("bogus_man", "bogus_pass").hostname("myhost").port(1234).db("mydb") - .authKey("mykey").timeout(30L) + r.connection() + .user("bogus_man", "bogus_pass") + .hostname("myhost") + .port(1234) + .db("mydb") + + .timeout(30L) ); assertEquals( DB_URL_STANDARD, - r.connection().user("bogus_man", "bogus_pass").hostname("myhost").port(1234).db("mydb") - .authKey("mykey").timeout(30L).dbUrlString() + r.connection() + .user("bogus_man", "bogus_pass") + .hostname("myhost") + .port(1234) + .db("mydb") + + .timeout(30L) + .dbUrlString() ); } @@ -36,13 +50,28 @@ public void testNonStandardDbUrl() { assertEquals(URI.create(DB_URL_NON_STANDARD), r.connection(DB_URL_NON_STANDARD).dbUrl()); assertEquals( r.connection(DB_URL_NON_STANDARD), - r.connection().user("bogus_man", "bogus_pass").hostname("myhost").port(1234).db("mydb") - .authKey("mykey").timeout(30L).defaultFetchMode(Result.FetchMode.LAZY).unwrapLists(true) + r.connection() + .user("bogus_man", "bogus_pass") + .hostname("myhost") + .port(1234) + .db("mydb") + .timeout(30L) + .defaultFetchMode(Result.FetchMode.LAZY) + .unwrapLists(true) + .persistentThreads(true) ); assertEquals( DB_URL_NON_STANDARD, - r.connection().user("bogus_man", "bogus_pass").hostname("myhost").port(1234).db("mydb") - .authKey("mykey").timeout(30L).defaultFetchMode(Result.FetchMode.LAZY).unwrapLists(true).dbUrlString() + r.connection() + .user("bogus_man", "bogus_pass") + .hostname("myhost") + .port(1234) + .db("mydb") + .timeout(30L) + .defaultFetchMode(Result.FetchMode.LAZY) + .unwrapLists(true) + .persistentThreads(true) + .dbUrlString() ); } @@ -53,25 +82,43 @@ public void testNonStandardAlternateDbUrl() { assertEquals(URI.create(DB_URL_NON_STANDARD), r.connection(DB_URL_NON_STANDARD_ALTERNATE).dbUrl()); assertEquals( r.connection(DB_URL_NON_STANDARD), - r.connection().user("bogus_man", "bogus_pass").hostname("myhost").port(1234).db("mydb") - .authKey("mykey").timeout(30L).defaultFetchMode(Result.FetchMode.LAZY).unwrapLists(true) + r.connection() + .user("bogus_man", "bogus_pass").hostname("myhost").port(1234).db("mydb") + .timeout(30L).defaultFetchMode(Result.FetchMode.LAZY).unwrapLists(true).persistentThreads(true) ); assertEquals( r.connection(DB_URL_NON_STANDARD_ALTERNATE), - r.connection().user("bogus_man", "bogus_pass").hostname("myhost").port(1234).db("mydb") - .authKey("mykey").timeout(30L).defaultFetchMode(Result.FetchMode.LAZY).unwrapLists(true) + r.connection() + .user("bogus_man", "bogus_pass").hostname("myhost").port(1234).db("mydb") + .timeout(30L).defaultFetchMode(Result.FetchMode.LAZY).unwrapLists(true).persistentThreads(true) ); assertEquals( DB_URL_NON_STANDARD, - r.connection().user("bogus_man", "bogus_pass").hostname("myhost").port(1234).db("mydb") - .authKey("mykey").timeout(30L).defaultFetchMode(Result.FetchMode.LAZY).unwrapLists(true).dbUrlString() + r.connection() + .user("bogus_man", "bogus_pass") + .hostname("myhost") + .port(1234) + .db("mydb") + .timeout(30L) + .defaultFetchMode(Result.FetchMode.LAZY) + .unwrapLists(true) + .persistentThreads(true) + .dbUrlString() ); assertNotEquals(DB_URL_NON_STANDARD_ALTERNATE, r.connection(DB_URL_NON_STANDARD_ALTERNATE).dbUrlString()); assertNotEquals(URI.create(DB_URL_NON_STANDARD_ALTERNATE), r.connection(DB_URL_NON_STANDARD_ALTERNATE).dbUrl()); assertNotEquals( DB_URL_NON_STANDARD_ALTERNATE, - r.connection().user("bogus_man", "bogus_pass").hostname("myhost").port(1234).db("mydb") - .authKey("mykey").timeout(30L).defaultFetchMode(Result.FetchMode.LAZY).unwrapLists(true).dbUrlString() + r.connection() + .user("bogus_man", "bogus_pass") + .hostname("myhost") + .port(1234) + .db("mydb") + .timeout(30L) + .defaultFetchMode(Result.FetchMode.LAZY) + .unwrapLists(true) + .persistentThreads(true) + .dbUrlString() ); } } diff --git a/src/test/java/com/rethinkdb/RethinkDBTest.java b/src/test/java/com/rethinkdb/RethinkDBTest.java index abc52b29..b6b4ddea 100644 --- a/src/test/java/com/rethinkdb/RethinkDBTest.java +++ b/src/test/java/com/rethinkdb/RethinkDBTest.java @@ -401,7 +401,7 @@ public void testConcurrentCursor() throws TimeoutException, InterruptedException } @Test - public void testNoreply() throws Exception { + public void testNoreply() { r.expr(null).runNoReply(conn); } } diff --git a/src/test/java/com/rethinkdb/TestingFramework.java b/src/test/java/com/rethinkdb/TestingFramework.java index df05d8e7..c72cacce 100644 --- a/src/test/java/com/rethinkdb/TestingFramework.java +++ b/src/test/java/com/rethinkdb/TestingFramework.java @@ -1,78 +1,45 @@ package com.rethinkdb; import com.rethinkdb.net.Connection; +import com.rethinkdb.net.Result; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.Objects; -import java.util.Properties; +import java.io.*; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; + +import static com.rethinkdb.RethinkDB.*; +import static java.nio.charset.StandardCharsets.*; /** * Very basic testing framework lying miserably in the java's default package. */ public class TestingFramework { + private static final String OVERRIDE_FILE_NAME = "test-dburl-override.txt"; - private static final String DEFAULT_CONFIG_RESOURCE = "default-config.properties"; - private static final String OVERRIDE_FILE_NAME = "test-config-override.properties"; - - // properties used to populate configuration - private static final String PROP_HOSTNAME = "hostName"; - private static final String PROP_PORT = "port"; - private static final String PROP_AUTHKEY = "authKey"; - - private static Connection.Builder defaultConnectionBuilder; + private static Connection.Builder builder; /** * Provision a connection builder based on the test configuration. *

- * Put a propertiy file called "test-config-override.properties" in the working - * directory of the tests to override default values. - *

- * Example: - *

-     *     hostName=myHost
-     *     port=12345
-     * 
- *

+ * Put a propertiy file called "test-dburl-override.txt" in the working directory of the tests to override default values. + * The file contents must be a RethinkDB db-url. * * @return Default connection builder. */ public static Connection.Builder defaultConnectionBuilder() { - if (defaultConnectionBuilder == null) { - Properties config = new Properties(); - - try (InputStream is = TestingFramework.class.getClassLoader().getResourceAsStream(DEFAULT_CONFIG_RESOURCE)) { - config.load(Objects.requireNonNull(is)); - } catch (NullPointerException | IOException e) { - throw new IllegalStateException(e); - } - - // Check the local override file. - String workdir = System.getProperty("user.dir"); - File defaultFile = new File(workdir, OVERRIDE_FILE_NAME); + if (builder == null) { + File defaultFile = new File(OVERRIDE_FILE_NAME); if (defaultFile.exists()) { - try (InputStream is = new FileInputStream(defaultFile)) { - config.load(is); + try { + builder = r.connection(new String(Files.readAllBytes(defaultFile.toPath()), UTF_8)); } catch (IOException e) { throw new IllegalStateException(e); } - } - - // provision connection builder based on configuration - defaultConnectionBuilder = RethinkDB.r.connection(); - // mandatory fields - defaultConnectionBuilder = defaultConnectionBuilder.hostname(config.getProperty(PROP_HOSTNAME).trim()); - defaultConnectionBuilder = defaultConnectionBuilder.port(Integer.parseInt(config.getProperty(PROP_PORT).trim())); - // optional fields - final String authKey = config.getProperty(PROP_AUTHKEY); - if (authKey != null) { - defaultConnectionBuilder.authKey(config.getProperty(PROP_AUTHKEY).trim()); + } else { + builder = r.connection(); } } - - return defaultConnectionBuilder; + return builder; } /** diff --git a/templates/TopLevel.java b/templates/TopLevel.java index d5656724..3220aac3 100644 --- a/templates/TopLevel.java +++ b/templates/TopLevel.java @@ -1,7 +1,6 @@ <%page args="all_terms" /> package com.rethinkdb.gen.model; -import com.rethinkdb.ast.ReqlAst; import com.rethinkdb.model.Arguments; import com.rethinkdb.model.MapObject; import com.rethinkdb.gen.ast.Error; @@ -9,7 +8,6 @@ import com.rethinkdb.gen.exc.ReqlDriverError; import com.rethinkdb.utils.Internals; -import java.util.Arrays; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -25,7 +23,7 @@ public ReqlExpr row(Object... values) { " Use lambda syntax instead"); } - public static Object pathspec(Object... path) { + public Object pathspec(Object... path) { if (path.length < 2) { throw new ReqlDriverError("r.pathspec(...) requires at least two parameters."); }