Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions core/src/main/java/org/apache/druid/data/input/Firehose.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.data.input;

import org.apache.druid.guice.annotations.ExtensionPoint;
import org.apache.druid.java.util.common.parsers.ParseException;

import javax.annotation.Nullable;
import java.io.Closeable;
Expand Down Expand Up @@ -66,6 +67,24 @@ public interface Firehose extends Closeable
@Nullable
InputRow nextRow();

/**
* Returns an InputRowPlusRaw object containing the InputRow plus the raw, unparsed data corresponding to the next row
* available. Used in the sampler to provide the caller with information to assist in configuring a parse spec. If a
* ParseException is thrown by the parser, it should be caught and returned in the InputRowPlusRaw so we will be able
* to provide information on the raw row which failed to be parsed. Should only be called if hasMore returns true.
*
* @return an InputRowPlusRaw which may contain any of: an InputRow, the raw data, or a ParseException
*/
default InputRowPlusRaw nextRowWithRaw()
{
try {
return InputRowPlusRaw.of(nextRow(), null);
}
catch (ParseException e) {
return InputRowPlusRaw.of(null, e);
}
}

/**
* Returns a runnable that will "commit" everything read up to the point at which commit() is called. This is
* often equivalent to everything that has been read since the last commit() call (or instantiation of the object),
Expand All @@ -79,9 +98,9 @@ public interface Firehose extends Closeable
* The Runnable is essentially just a lambda/closure that is run() after data supplied by this instance has
* been committed on the writer side of this interface protocol.
* <p>
* A simple implementation of this interface might do nothing when run() is called
* (in which case the same do-nothing instance can be returned every time), or
* a more complex implementation might clean up temporary resources that are no longer needed
* A simple implementation of this interface might do nothing when run() is called
* (in which case the same do-nothing instance can be returned every time), or
* a more complex implementation might clean up temporary resources that are no longer needed
* because of InputRows delivered by prior calls to {@link #nextRow()}.
* </p>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,18 @@ default Firehose connect(T parser, @Nullable File temporaryDirectory) throws IOE
return connect(parser);
}

/**
* Initialization method that connects up the firehose. This method is intended for use by the sampler, and allows
* implementors to return a more efficient firehose, knowing that only a small number of rows will be read.
*
* @param parser an input row parser
* @param temporaryDirectory a directory where temporary files are stored
*/
default Firehose connectForSampler(T parser, @Nullable File temporaryDirectory) throws IOException, ParseException
{
return connect(parser, temporaryDirectory);
}

default boolean isSplittable()
{
return false;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.data.input;

import org.apache.druid.java.util.common.parsers.ParseException;

import javax.annotation.Nullable;

public class InputRowPlusRaw
{
@Nullable
private final InputRow inputRow;

@Nullable
private final byte[] raw;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you please add a comment about when raw can be null? I guess inputRow would be null if parsing fails, but not sure when raw would be null.


@Nullable
private final ParseException parseException;

private InputRowPlusRaw(@Nullable InputRow inputRow, @Nullable byte[] raw, @Nullable ParseException parseException)
{
this.inputRow = inputRow;
this.raw = raw;
this.parseException = parseException;
}

@Nullable
public InputRow getInputRow()
{
return inputRow;
}

/**
* The raw, unparsed event (as opposed to an {@link InputRow} which is the output of a parser). The interface default
* for {@link Firehose#nextRowWithRaw()} sets this to null, so this will only be non-null if nextRowWithRaw() is
* overridden by an implementation, such as in
* {@link org.apache.druid.data.input.impl.FileIteratingFirehose#nextRowWithRaw()}. Note that returning the raw row
* does not make sense for some sources (e.g. non-row based types), so clients should be able to handle this field
* being unset.
*/
@Nullable
public byte[] getRaw()
{
return raw;
}

@Nullable
public ParseException getParseException()
{
return parseException;
}

public boolean isEmpty()
{
return inputRow == null && raw == null && parseException == null;
}

public static InputRowPlusRaw of(@Nullable InputRow inputRow, @Nullable byte[] raw)
{
return new InputRowPlusRaw(inputRow, raw, null);
}

public static InputRowPlusRaw of(@Nullable byte[] raw, @Nullable ParseException parseException)
{
return new InputRowPlusRaw(null, raw, parseException);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import org.apache.commons.io.LineIterator;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowPlusRaw;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.utils.Runnables;

import javax.annotation.Nullable;
Expand All @@ -30,8 +33,6 @@
import java.util.Iterator;
import java.util.NoSuchElementException;

/**
*/
public class FileIteratingFirehose implements Firehose
{
private final Iterator<LineIterator> lineIterators;
Expand Down Expand Up @@ -81,6 +82,22 @@ public InputRow nextRow()
return parser.parse(lineIterator.next());
}

@Override
public InputRowPlusRaw nextRowWithRaw()
{
if (!hasMore()) {
throw new NoSuchElementException();
}

String raw = lineIterator.next();
try {
return InputRowPlusRaw.of(parser.parse(raw), StringUtils.toUtf8(raw));
}
catch (ParseException e) {
return InputRowPlusRaw.of(StringUtils.toUtf8(raw), e);
}
}

private LineIterator getNextLineIterator()
{
if (lineIterator != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ private static class ParseCtx

private final String timestampColumn;
private final String timestampFormat;
// this value should never be set for production data
// this value should never be set for production data; the data loader uses it before a timestamp column is chosen
private final DateTime missingValue;
/** This field is a derivative of {@link #timestampFormat}; not checked in {@link #equals} and {@link #hashCode} */
private final Function<Object, DateTime> timestampConverter;
Expand All @@ -59,7 +59,7 @@ private static class ParseCtx
public TimestampSpec(
@JsonProperty("column") String timestampColumn,
@JsonProperty("format") String format,
// this value should never be set for production data
// this value should never be set for production data; the data loader uses it before a timestamp column is chosen
@JsonProperty("missingValue") DateTime missingValue
)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ public abstract class PrefetchableTextFilesFirehoseFactory<T>
{
private static final Logger LOG = new Logger(PrefetchableTextFilesFirehoseFactory.class);

private static final CacheManager DISABLED_CACHE_MANAGER = new CacheManager(0);
private static final PrefetchConfig DISABLED_PREFETCH_CONFIG = new PrefetchConfig(0L, 0L, 0L, 0L);

public static final int DEFAULT_MAX_FETCH_RETRY = 3;

private final CacheManager<T> cacheManager;
Expand Down Expand Up @@ -157,6 +160,22 @@ CacheManager<T> getCacheManager()

@Override
public Firehose connect(StringInputRowParser firehoseParser, @Nullable File temporaryDirectory) throws IOException
{
return connectInternal(firehoseParser, temporaryDirectory, this.prefetchConfig, this.cacheManager);
}

@Override
public Firehose connectForSampler(StringInputRowParser parser, @Nullable File temporaryDirectory) throws IOException
{
return connectInternal(parser, temporaryDirectory, DISABLED_PREFETCH_CONFIG, DISABLED_CACHE_MANAGER);
}

private Firehose connectInternal(
StringInputRowParser firehoseParser,
@Nullable File temporaryDirectory,
PrefetchConfig prefetchConfig,
CacheManager cacheManager
) throws IOException
{
if (objects == null) {
objects = ImmutableList.copyOf(Preconditions.checkNotNull(initObjects(), "objects"));
Expand Down
Loading