Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion receiver/restapireceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ Alpha:
| `oauth2` | object | | `false` | OAuth2 Client Credentials configuration (see below) |
| `akamai_edgegrid` | object | | `false` | Akamai EdgeGrid configuration (see below) |
| `pagination` | object | | `false` | Pagination configuration (see below) |
| `max_poll_interval` | duration | `5m` | `false` | Maximum interval between API polls. The receiver uses adaptive polling that starts fast and backs off when no data is returned, up to this maximum. |
| `min_poll_interval` | duration | `10s` | `false` | Minimum interval between API polls. The receiver resets to this interval when data is received. Increase this to prevent hitting API rate limits. |
| `max_poll_interval` | duration | `5m` | `false` | Maximum interval between API polls. The receiver uses adaptive polling that starts at `min_poll_interval` and backs off when no data is returned, up to this maximum. |
| `backoff_multiplier` | float | `2.0` | `false` | Multiplier for increasing the poll interval when no data or a partial page is returned. Must be greater than 1.0. |
| `storage` | component | | `false` | The component ID of a storage extension for checkpointing |
| `timeout` | duration | `10s` | `false` | HTTP client timeout |

Expand Down Expand Up @@ -379,6 +381,18 @@ The receiver expects JSON responses in one of two formats:

When using the second format, specify the field name in `response_field` (e.g., `"data"`).

## Adaptive Polling

The receiver uses adaptive polling to balance responsiveness with API rate limits. Instead of polling at a fixed interval, it adjusts the poll interval based on whether data is being returned.

- **On startup**, the receiver polls at `min_poll_interval`.
- **When a full page is returned** (indicating more data may be waiting), the interval resets to `min_poll_interval` to fetch remaining data quickly.
- **When no data or a partial page is returned** (indicating the receiver is caught up), the interval is multiplied by `backoff_multiplier` each cycle, up to `max_poll_interval`.

For example, with the defaults (`min_poll_interval: 10s`, `max_poll_interval: 5m`, `backoff_multiplier: 2.0`), the polling intervals when no new data arrives would be: 10s, 20s, 40s, 80s, 160s, 300s (capped at 5m). As soon as a full page of data is returned, the interval resets back to 10s.

To poll at a fixed interval, set `min_poll_interval` and `max_poll_interval` to the same value.

## Checkpointing

When a storage extension is configured, the receiver saves its pagination state to storage. This allows the receiver to resume from where it left off after a restart, preventing duplicate data collection.
Expand Down
36 changes: 34 additions & 2 deletions receiver/restapireceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,22 @@ type Config struct {
// Pagination defines pagination configuration.
Pagination PaginationConfig `mapstructure:"pagination"`

// MinPollInterval is the minimum interval between API polls.
// The receiver uses adaptive polling that resets to this interval when data
// is received, and backs off when no data is returned.
MinPollInterval time.Duration `mapstructure:"min_poll_interval"`

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also expose the backoff multiplier as a parameter?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably a good idea, but not required for MVP I would think.

Still, so quick, maybe we should just do it?

// MaxPollInterval is the maximum interval between API polls.
// The receiver uses adaptive polling that starts with a short interval and
// backs off when no data is returned, up to this maximum.
MaxPollInterval time.Duration `mapstructure:"max_poll_interval"`

// BackoffMultiplier is the multiplier for increasing the poll interval
// when no data or a partial page is returned. For example, with a multiplier
// of 2.0 and a current interval of 10s, the next interval will be 20s.
// Must be greater than 1.0. Defaults to 2.0.
BackoffMultiplier float64 `mapstructure:"backoff_multiplier"`

// ClientConfig defines HTTP client configuration.
ClientConfig confighttp.ClientConfig `mapstructure:",squash"`

Expand Down Expand Up @@ -365,13 +376,34 @@ func (c *Config) Validate() error {
}
}

// Apply default if not configured (zero value means not set)
// Apply defaults if not configured (zero value means not set)
if c.MinPollInterval == 0 {
c.MinPollInterval = 10 * time.Second
}

if c.MinPollInterval < 0 {
return fmt.Errorf("min_poll_interval must be greater than or equal to 0")
}

if c.MaxPollInterval == 0 {
c.MaxPollInterval = 5 * time.Minute
}

if c.MaxPollInterval < 0 {
return fmt.Errorf("max_poll_interval must be greater than 0")
return fmt.Errorf("max_poll_interval must be greater than or equal to 0")
}

if c.MinPollInterval > c.MaxPollInterval {
return fmt.Errorf("min_poll_interval (%s) must be less than or equal to max_poll_interval (%s)", c.MinPollInterval, c.MaxPollInterval)
}

// Apply default backoff multiplier if not configured
if c.BackoffMultiplier == 0 {
c.BackoffMultiplier = 2.0
}

if c.BackoffMultiplier <= 1.0 {
return fmt.Errorf("backoff_multiplier must be greater than 1.0")
}

return nil
Expand Down
54 changes: 53 additions & 1 deletion receiver/restapireceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,22 @@ func TestConfig_Validate(t *testing.T) {
},
expectedErr: `initial_timestamp "invalid-timestamp" could not be parsed`,
},
{
name: "negative min_poll_interval",
config: &Config{
URL: "https://api.example.com/data",
AuthMode: authModeAPIKey,
APIKeyConfig: APIKeyConfig{
HeaderName: "X-API-Key",
Value: "test-key",
},
Pagination: PaginationConfig{
Mode: paginationModeNone,
},
MinPollInterval: -1 * time.Second,
},
expectedErr: "min_poll_interval must be greater than or equal to 0",
},
{
name: "negative max_poll_interval",
config: &Config{
Expand All @@ -616,7 +632,41 @@ func TestConfig_Validate(t *testing.T) {
},
MaxPollInterval: -1 * time.Minute,
},
expectedErr: "max_poll_interval must be greater than 0",
expectedErr: "max_poll_interval must be greater than or equal to 0",
},
{
name: "min_poll_interval greater than max_poll_interval",
config: &Config{
URL: "https://api.example.com/data",
AuthMode: authModeAPIKey,
APIKeyConfig: APIKeyConfig{
HeaderName: "X-API-Key",
Value: "test-key",
},
Pagination: PaginationConfig{
Mode: paginationModeNone,
},
MinPollInterval: 10 * time.Minute,
MaxPollInterval: 5 * time.Minute,
},
expectedErr: "min_poll_interval (10m0s) must be less than or equal to max_poll_interval (5m0s)",
},
{
name: "valid min_poll_interval",
config: &Config{
URL: "https://api.example.com/data",
AuthMode: authModeAPIKey,
APIKeyConfig: APIKeyConfig{
HeaderName: "X-API-Key",
Value: "test-key",
},
Pagination: PaginationConfig{
Mode: paginationModeNone,
},
MinPollInterval: 30 * time.Second,
MaxPollInterval: 5 * time.Minute,
},
expectedErr: "",
},
}

Expand All @@ -639,6 +689,7 @@ func TestConfig_DefaultValues(t *testing.T) {

require.Equal(t, authModeNone, cfg.AuthMode)
require.Equal(t, paginationModeNone, cfg.Pagination.Mode)
require.Equal(t, 10*time.Second, cfg.MinPollInterval)
require.Equal(t, 5*time.Minute, cfg.MaxPollInterval)
require.Equal(t, 0, cfg.Pagination.PageLimit)
require.False(t, cfg.Pagination.ZeroBasedIndex)
Expand Down Expand Up @@ -671,6 +722,7 @@ func TestLoadConfigFromYAML(t *testing.T) {
// Verify the config values were parsed correctly
require.Equal(t, "https://api.example.com/data", restapiCfg.URL)
require.Equal(t, "data", restapiCfg.ResponseField)
require.Equal(t, 10*time.Second, restapiCfg.MinPollInterval)
require.Equal(t, 5*time.Minute, restapiCfg.MaxPollInterval)
require.Equal(t, authModeAPIKey, restapiCfg.AuthMode)
require.Equal(t, "test-key", restapiCfg.APIKeyConfig.Value)
Expand Down
4 changes: 3 additions & 1 deletion receiver/restapireceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ func createDefaultConfig() component.Config {
PageLimit: 0,
ZeroBasedIndex: false,
},
MaxPollInterval: 5 * time.Minute,
MinPollInterval: 10 * time.Second,
MaxPollInterval: 5 * time.Minute,
BackoffMultiplier: 2.0,
ClientConfig: confighttp.ClientConfig{
Timeout: 10 * time.Second,
},
Expand Down
47 changes: 28 additions & 19 deletions receiver/restapireceiver/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"encoding/json"
"net/http"
"net/http/httptest"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -33,9 +35,9 @@ import (

// TestIntegration_EndToEnd_Logs tests a complete end-to-end scenario for logs collection.
func TestIntegration_EndToEnd_Logs(t *testing.T) {
requestCount := 0
var requestCount atomic.Int32
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
requestCount++
requestCount.Add(1)
response := map[string]any{
"logs": []map[string]any{
{"id": "1", "level": "info", "message": "test log 1", "timestamp": time.Now().Format(time.RFC3339)},
Expand Down Expand Up @@ -85,14 +87,14 @@ func TestIntegration_EndToEnd_Logs(t *testing.T) {
require.Greater(t, len(allLogs), 0)

// Verify multiple requests were made
require.Greater(t, requestCount, 1)
require.Greater(t, int(requestCount.Load()), 1)
}

// TestIntegration_EndToEnd_Metrics tests a complete end-to-end scenario for metrics collection.
func TestIntegration_EndToEnd_Metrics(t *testing.T) {
requestCount := 0
var requestCount atomic.Int32
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
requestCount++
requestCount.Add(1)
response := []map[string]any{
{"metric": "cpu_usage", "value": 75.5, "timestamp": time.Now().Format(time.RFC3339)},
{"metric": "memory_usage", "value": 60.2, "timestamp": time.Now().Format(time.RFC3339)},
Expand Down Expand Up @@ -141,12 +143,12 @@ func TestIntegration_EndToEnd_Metrics(t *testing.T) {
require.Greater(t, len(allMetrics), 0)

// Verify multiple requests were made
require.Greater(t, requestCount, 1)
require.Greater(t, int(requestCount.Load()), 1)
}

// TestIntegration_WithPaginationAndAuth tests a complete scenario with pagination and authentication.
func TestIntegration_WithPaginationAndAuth(t *testing.T) {
pageCount := 0
var pageCount atomic.Int32
expectedAuthHeader := "Bearer test-token-123"

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -181,7 +183,7 @@ func TestIntegration_WithPaginationAndAuth(t *testing.T) {
}
}

pageCount++
pageCount.Add(1)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}))
Expand Down Expand Up @@ -239,18 +241,21 @@ func TestIntegration_WithPaginationAndAuth(t *testing.T) {

// TestIntegration_TimestampPagination tests timestamp-based pagination.
func TestIntegration_TimestampPagination(t *testing.T) {
var mu sync.Mutex
var lastTimestamp string
var pageSize string
pageCount := 0
var pageCount atomic.Int32
initialTime := time.Now().Add(-1 * time.Hour)

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Capture the timestamp and page size parameters
mu.Lock()
lastTimestamp = r.URL.Query().Get("t0")
pageSize = r.URL.Query().Get("perPage")
mu.Unlock()

var response []map[string]any
if pageCount == 0 {
if pageCount.Load() == 0 {
// First page - return full page
response = []map[string]any{
{"id": "1", "message": "test1", "ts": time.Now().Add(-30 * time.Minute).Format(time.RFC3339)},
Expand All @@ -262,7 +267,7 @@ func TestIntegration_TimestampPagination(t *testing.T) {
{"id": "3", "message": "test3", "ts": time.Now().Add(-10 * time.Minute).Format(time.RFC3339)},
}
}
pageCount++
pageCount.Add(1)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}))
Expand Down Expand Up @@ -307,20 +312,24 @@ func TestIntegration_TimestampPagination(t *testing.T) {
require.NoError(t, err)

// Verify timestamp parameter was used
require.NotEmpty(t, lastTimestamp)
require.Contains(t, lastTimestamp, "T") // RFC3339 format check
mu.Lock()
ts := lastTimestamp
ps := pageSize
mu.Unlock()
require.NotEmpty(t, ts)
require.Contains(t, ts, "T") // RFC3339 format check
// Verify page size parameter was used
require.Equal(t, "200", pageSize)
require.Equal(t, "200", ps)
// Verify multiple pages were fetched
require.Greater(t, pageCount, 1)
require.Greater(t, int(pageCount.Load()), 1)
}

// TestIntegration_ErrorRecovery tests that the receiver continues polling after errors.
func TestIntegration_ErrorRecovery(t *testing.T) {
requestCount := 0
var requestCount atomic.Int32
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
requestCount++
if requestCount == 1 {
count := requestCount.Add(1)
if count == 1 {
// First request returns error
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("Internal Server Error"))
Expand Down Expand Up @@ -367,7 +376,7 @@ func TestIntegration_ErrorRecovery(t *testing.T) {
require.NoError(t, err)

// Verify receiver continued polling after error
require.Greater(t, requestCount, 1)
require.Greater(t, int(requestCount.Load()), 1)

// Verify some data was eventually collected
allLogs := sink.AllLogs()
Expand Down
Loading