diff --git a/src/main/java/com/rethinkdb/net/Connection.java b/src/main/java/com/rethinkdb/net/Connection.java index 1765d385..d34016a4 100644 --- a/src/main/java/com/rethinkdb/net/Connection.java +++ b/src/main/java/com/rethinkdb/net/Connection.java @@ -182,11 +182,12 @@ public boolean isOpen() { @Nullable TypeReference typeRef) { try { return runAsync(term, optArgs, fetchMode, typeRef).join(); - } catch (CompletionException e) { - if (e.getCause() instanceof ReqlError) { - throw ((ReqlError) e.getCause()); + } catch (CompletionException ce) { + Throwable t = ce.getCause(); + if (t instanceof ReqlError) { + throw ((ReqlError) t); } - throw e; + throw new ReqlDriverError(t); } } @@ -212,12 +213,12 @@ public boolean isOpen() { public @NotNull Server server() { try { return serverAsync().join(); - } catch ( - CompletionException e) { - if (e.getCause() instanceof ReqlError) { - throw ((ReqlError) e.getCause()); + } catch (CompletionException ce) { + Throwable t = ce.getCause(); + if (t instanceof ReqlError) { + throw ((ReqlError) t); } - throw e; + throw new ReqlDriverError(t); } } @@ -236,11 +237,12 @@ public boolean isOpen() { public void noreplyWait() { try { noreplyWaitAsync().join(); - } catch (CompletionException e) { - if (e.getCause() instanceof ReqlError) { - throw ((ReqlError) e.getCause()); + } catch (CompletionException ce) { + Throwable t = ce.getCause(); + if (t instanceof ReqlError) { + throw ((ReqlError) t); } - throw e; + throw new ReqlDriverError(t); } } diff --git a/src/main/java/com/rethinkdb/net/DefaultConnectionFactory.java b/src/main/java/com/rethinkdb/net/DefaultConnectionFactory.java index 674a4f9d..32b3b28a 100644 --- a/src/main/java/com/rethinkdb/net/DefaultConnectionFactory.java +++ b/src/main/java/com/rethinkdb/net/DefaultConnectionFactory.java @@ -106,25 +106,30 @@ public void write(@NotNull ByteBuffer buffer) { @Override public @NotNull String readCString(@Nullable Long deadline) { - try { - final StringBuilder sb = new StringBuilder(); - int next; - char c; - while ((next = inputStream.read()) != -1 && (c = (char) next) != '\0') { - // is there a deadline? - if (deadline != null) { - // have we timed-out? - if (deadline < System.currentTimeMillis()) { // reached time-out - throw new ReqlDriverError("Connection timed out."); - } + Long timeout = deadline == null ? null : System.currentTimeMillis() + deadline; + final StringBuilder b = new StringBuilder(); + int has; + int next; + char c; + while (timeout == null || System.currentTimeMillis() < timeout) { + try { + has = inputStream.available(); + if (has < 0) { + break; + } + if (has == 0) { + Thread.yield(); + continue; } - sb.append(c); + if ((next = inputStream.read()) == -1 || (c = (char) next) == '\0') { + return b.toString(); + } + } catch (IOException e) { + throw new ReqlDriverError(e); } - - return sb.toString(); - } catch (IOException e) { - throw new ReqlDriverError(e); + b.append(c); } + throw new ReqlDriverError("Read timed out."); } @Override @@ -231,5 +236,10 @@ private void shutdown(Exception e) { public void shutdownPump() { shutdown(new ReqlDriverError("Response pump closed.")); } + + @Override + public String toString() { + return "ThreadResponsePump"; + } } }