Skip to content

Commit 834b808

Browse files
authored
feat(resources): add resource watch scheduling and status tracking (volcengine#709)
* feat(resource): add watch interval support for resource monitoring implement resource watch functionality that allows automatic monitoring and re-processing of resources at specified intervals. key features include: - add watch_interval parameter to resource APIs - create watch scheduler service for task execution - handle conflict detection for active watch tasks - provide watch status query capability - include comprehensive tests and examples the watch feature enables periodic automatic updates of resources without manual intervention, improving data freshness for frequently changing content * feat(resources): add watch status tracking and improve resource processing - Implement get_watch_status API for tracking resource watch status - Add immediate persistence for first-time resource additions - Improve file change detection with size comparison - Refactor watch scheduler with better concurrency control - Add test coverage for watch status and resource processing - Remove unused watch manager references and clean up code * refactor: improve code style and fix minor issues - Simplify logging by removing redundant data copying - Fix syntax errors in docstrings and string literals - Add new fields to EmbeddingMsg class - Improve line wrapping and formatting - Update watch task storage URIs to use hidden files * refactor(embedding_msg): simplify EmbeddingMsg constructor by removing unused fields Remove media_uri, media_mime_type and id parameters as they are not used in the implementation * feat(watch): add backup task recovery and simplify permission check Add test case for recovering tasks from backup storage when primary is missing Remove require_owner parameter from _check_permission as it's redundant with the existing role-based checks * feat(resources): add watch_interval support for resource updates Add watch_interval parameter to enable periodic resource updates. When target is specified, watch_interval > 0 creates/updates a watch task, while <= 0 disables it. Also simplify resource moving logic in ResourceProcessor by using direct mv operation. * refactor(watch): remove deprecated get_watch_status functionality remove get_watch_status method and related tests, update examples to use direct task access update watch manager to use ConflictError for URI conflicts and include original_role in tasks add validation for watch_interval requiring target URI * fix(resource_service): validate watch interval before processing resource Move watch interval validation earlier in the flow to fail fast when 'to' parameter is missing * refactor(resource_processor): remove redundant temp_uri assignment
1 parent 1823a7c commit 834b808

28 files changed

+4146
-112
lines changed

crates/ov_cli/src/client.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -480,6 +480,7 @@ impl HttpClient {
480480
include: Option<String>,
481481
exclude: Option<String>,
482482
directly_upload_media: bool,
483+
watch_interval: f64,
483484
) -> Result<serde_json::Value> {
484485
let path_obj = Path::new(path);
485486

@@ -501,6 +502,7 @@ impl HttpClient {
501502
"include": include,
502503
"exclude": exclude,
503504
"directly_upload_media": directly_upload_media,
505+
"watch_interval": watch_interval,
504506
});
505507

506508
self.post("/api/v1/resources", &body).await
@@ -520,6 +522,7 @@ impl HttpClient {
520522
"include": include,
521523
"exclude": exclude,
522524
"directly_upload_media": directly_upload_media,
525+
"watch_interval": watch_interval,
523526
});
524527

525528
self.post("/api/v1/resources", &body).await
@@ -537,6 +540,7 @@ impl HttpClient {
537540
"include": include,
538541
"exclude": exclude,
539542
"directly_upload_media": directly_upload_media,
543+
"watch_interval": watch_interval,
540544
});
541545

542546
self.post("/api/v1/resources", &body).await
@@ -555,6 +559,7 @@ impl HttpClient {
555559
"include": include,
556560
"exclude": exclude,
557561
"directly_upload_media": directly_upload_media,
562+
"watch_interval": watch_interval,
558563
});
559564

560565
self.post("/api/v1/resources", &body).await

crates/ov_cli/src/commands/resources.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ pub async fn add_resource(
1616
include: Option<String>,
1717
exclude: Option<String>,
1818
directly_upload_media: bool,
19+
watch_interval: f64,
1920
format: OutputFormat,
2021
compact: bool,
2122
) -> Result<()> {
@@ -33,6 +34,7 @@ pub async fn add_resource(
3334
include,
3435
exclude,
3536
directly_upload_media,
37+
watch_interval,
3638
)
3739
.await?;
3840
output_success(&result, format, compact);

crates/ov_cli/src/main.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,9 @@ enum Commands {
9696
/// Do not directly upload media files
9797
#[arg(long = "no-directly-upload-media", default_value_t = false)]
9898
no_directly_upload_media: bool,
99+
/// Watch interval in minutes for automatic resource monitoring (0 = no monitoring)
100+
#[arg(long, default_value = "0")]
101+
watch_interval: f64,
99102
},
100103
/// Add a skill into OpenViking
101104
AddSkill {
@@ -525,6 +528,7 @@ async fn main() {
525528
include,
526529
exclude,
527530
no_directly_upload_media,
531+
watch_interval,
528532
} => {
529533
handle_add_resource(
530534
path,
@@ -539,6 +543,7 @@ async fn main() {
539543
include,
540544
exclude,
541545
no_directly_upload_media,
546+
watch_interval,
542547
ctx,
543548
)
544549
.await
@@ -655,6 +660,7 @@ async fn handle_add_resource(
655660
include: Option<String>,
656661
exclude: Option<String>,
657662
no_directly_upload_media: bool,
663+
watch_interval: f64,
658664
ctx: CliContext,
659665
) -> Result<()> {
660666
let is_url = path.starts_with("http://")
@@ -722,6 +728,7 @@ async fn handle_add_resource(
722728
include,
723729
exclude,
724730
directly_upload_media,
731+
watch_interval,
725732
ctx.output_format,
726733
ctx.compact,
727734
).await

docs/en/api/02-resources.md

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ Add a resource to the knowledge base.
4545
| instruction | str | No | "" | Special processing instructions |
4646
| wait | bool | No | False | Wait for semantic processing to complete |
4747
| timeout | float | No | None | Timeout in seconds (only used when wait=True) |
48+
| watch_interval | float | No | 0 | Watch interval (minutes). >0 enables/updates watch; <=0 disables watch. Only takes effect when target is provided |
4849

4950
**Python SDK (Embedded / HTTP)**
5051

@@ -168,6 +169,48 @@ curl -X POST http://localhost:1933/api/v1/system/wait \
168169
openviking add-resource ./documents/guide.md --wait
169170
```
170171

172+
**Example: Watch for Updates (watch_interval)**
173+
174+
`watch_interval` is in minutes and periodically triggers re-processing for the specified target URI:
175+
176+
- `watch_interval > 0`: create (or reactivate and update) a watch task for the `target`
177+
- `watch_interval <= 0`: disable (deactivate) the watch task for the `target`
178+
- watch tasks are only managed when `target` / CLI `--to` is provided
179+
180+
If there is already an active watch task for the same `target`, submitting another request with `watch_interval > 0` returns a conflict error. Disable it first (`watch_interval = 0`) and then set a new interval.
181+
182+
**Python SDK (Embedded / HTTP)**
183+
184+
```python
185+
client.add_resource(
186+
"./documents/guide.md",
187+
target="viking://resources/documents/guide.md",
188+
watch_interval=60,
189+
)
190+
```
191+
192+
**HTTP API**
193+
194+
```bash
195+
curl -X POST http://localhost:1933/api/v1/resources \
196+
-H "Content-Type: application/json" \
197+
-H "X-API-Key: your-key" \
198+
-d '{
199+
"path": "./documents/guide.md",
200+
"target": "viking://resources/documents/guide.md",
201+
"watch_interval": 60
202+
}'
203+
```
204+
205+
**CLI**
206+
207+
```bash
208+
openviking add-resource ./documents/guide.md --to viking://resources/documents/guide.md --watch-interval 60
209+
210+
# Disable watch
211+
openviking add-resource ./documents/guide.md --to viking://resources/documents/guide.md --watch-interval 0
212+
```
213+
171214
---
172215

173216
### export_ovpack()

docs/zh/api/02-resources.md

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ Input -> Parser -> TreeBuilder -> AGFS -> SemanticQueue -> Vector Index
4545
| instruction | str || "" | 特殊处理指令 |
4646
| wait | bool || False | 等待语义处理完成 |
4747
| timeout | float || None | 超时时间(秒),仅在 wait=True 时生效 |
48+
| watch_interval | float || 0 | 定时更新间隔(分钟)。>0 开启/更新定时任务;<=0 关闭(停用)定时任务。仅在指定 target 时生效 |
4849

4950
**Python SDK (Embedded / HTTP)**
5051

@@ -168,6 +169,48 @@ curl -X POST http://localhost:1933/api/v1/system/wait \
168169
openviking add-resource ./documents/guide.md --wait
169170
```
170171

172+
**示例:开启定时更新(watch_interval)**
173+
174+
`watch_interval` 的单位为分钟,用于对指定的目标 URI 定期触发更新处理:
175+
176+
- `watch_interval > 0`:创建(或重新激活并更新)该 `target` 的定时任务
177+
- `watch_interval <= 0`:关闭(停用)该 `target` 的定时任务
178+
- 只有在指定 `target` / CLI `--to` 时才会创建定时任务
179+
180+
如果同一个 `target` 已存在激活中的定时任务,再次以 `watch_interval > 0` 提交会返回冲突错误;需要先将 `watch_interval` 设为 `0`(取消/停用)后再重新设置新的间隔。
181+
182+
**Python SDK (Embedded / HTTP)**
183+
184+
```python
185+
client.add_resource(
186+
"./documents/guide.md",
187+
target="viking://resources/documents/guide.md",
188+
watch_interval=60,
189+
)
190+
```
191+
192+
**HTTP API**
193+
194+
```bash
195+
curl -X POST http://localhost:1933/api/v1/resources \
196+
-H "Content-Type: application/json" \
197+
-H "X-API-Key: your-key" \
198+
-d '{
199+
"path": "./documents/guide.md",
200+
"target": "viking://resources/documents/guide.md",
201+
"watch_interval": 60
202+
}'
203+
```
204+
205+
**CLI**
206+
207+
```bash
208+
openviking add-resource ./documents/guide.md --to viking://resources/documents/guide.md --watch-interval 60
209+
210+
# 取消监控
211+
openviking add-resource ./documents/guide.md --to viking://resources/documents/guide.md --watch-interval 0
212+
```
213+
171214
---
172215

173216
### export_ovpack()

examples/watch_resource_example.py

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
#!/usr/bin/env python3
2+
# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd.
3+
# SPDX-License-Identifier: Apache-2.0
4+
"""
5+
Resource Watch Feature Example
6+
7+
This example demonstrates how to use the resource watch feature in OpenViking.
8+
The watch feature allows you to automatically re-process resources at specified
9+
intervals.
10+
11+
Key features:
12+
- Create resources with watch enabled
13+
- Update watch intervals (cancel then re-create)
14+
- Cancel watch tasks
15+
- Handle conflict errors
16+
"""
17+
18+
import asyncio
19+
from pathlib import Path
20+
21+
from openviking import AsyncOpenViking
22+
from openviking_cli.exceptions import ConflictError
23+
24+
25+
async def example_basic_watch():
26+
client = AsyncOpenViking(path="./data_watch_example")
27+
await client.initialize()
28+
29+
try:
30+
test_file = Path("./test_resource.md")
31+
test_file.write_text(
32+
"""# Test Resource
33+
34+
## Content
35+
This is a test resource for watch functionality.
36+
37+
## Version
38+
Version: 1.0
39+
"""
40+
)
41+
42+
to_uri = "viking://resources/watched_resource"
43+
44+
print("\nAdding resource with watch_interval=60.0 minutes...")
45+
result = await client.add_resource(
46+
path=str(test_file),
47+
to=to_uri,
48+
reason="Example: monitoring a document",
49+
instruction="Check for updates and re-index",
50+
watch_interval=60.0,
51+
)
52+
53+
print("Resource added successfully!")
54+
print(f" Root URI: {result['root_uri']}")
55+
finally:
56+
await client.close()
57+
58+
59+
async def example_update_watch_interval():
60+
client = AsyncOpenViking(path="./data_watch_example")
61+
await client.initialize()
62+
63+
try:
64+
test_file = Path("./test_resource.md")
65+
to_uri = "viking://resources/watched_resource"
66+
67+
print("\nUpdating watch interval by canceling then re-creating...")
68+
await client.add_resource(
69+
path=str(test_file),
70+
to=to_uri,
71+
watch_interval=0,
72+
)
73+
await client.add_resource(
74+
path=str(test_file),
75+
to=to_uri,
76+
reason="Updated: more frequent monitoring",
77+
watch_interval=120.0,
78+
)
79+
print("Watch task updated successfully!")
80+
finally:
81+
await client.close()
82+
83+
84+
async def example_cancel_watch():
85+
client = AsyncOpenViking(path="./data_watch_example")
86+
await client.initialize()
87+
88+
try:
89+
test_file = Path("./test_resource.md")
90+
to_uri = "viking://resources/watched_resource"
91+
92+
print("\nCancelling watch by setting interval to 0...")
93+
await client.add_resource(
94+
path=str(test_file),
95+
to=to_uri,
96+
watch_interval=0,
97+
)
98+
print("Watch task cancelled successfully!")
99+
finally:
100+
await client.close()
101+
102+
103+
async def example_handle_conflict():
104+
client = AsyncOpenViking(path="./data_watch_example")
105+
await client.initialize()
106+
107+
try:
108+
test_file = Path("./test_resource.md")
109+
to_uri = "viking://resources/conflict_example"
110+
111+
print("\nCreating first watch task...")
112+
await client.add_resource(
113+
path=str(test_file),
114+
to=to_uri,
115+
watch_interval=30.0,
116+
)
117+
print(" First watch task created successfully")
118+
119+
print("\nAttempting to create second watch task for same URI...")
120+
try:
121+
await client.add_resource(
122+
path=str(test_file),
123+
to=to_uri,
124+
watch_interval=60.0,
125+
)
126+
print(" ERROR: This should not happen!")
127+
except ConflictError as e:
128+
print(" ConflictError caught as expected!")
129+
print(f" Error message: {e}")
130+
finally:
131+
await client.close()
132+
133+
134+
async def main():
135+
print("\n" + "=" * 60)
136+
print("OpenViking Resource Watch Examples")
137+
print("=" * 60)
138+
139+
await example_basic_watch()
140+
await example_update_watch_interval()
141+
await example_cancel_watch()
142+
await example_handle_conflict()
143+
144+
print("\n" + "=" * 60)
145+
print("All examples completed!")
146+
print("=" * 60)
147+
148+
149+
if __name__ == "__main__":
150+
asyncio.run(main())
151+

openviking/async_client.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ async def add_resource(
190190
timeout: float = None,
191191
build_index: bool = True,
192192
summarize: bool = False,
193+
watch_interval: float = 0,
193194
telemetry: TelemetryRequest = False,
194195
**kwargs,
195196
) -> Dict[str, Any]:
@@ -223,9 +224,14 @@ async def add_resource(
223224
build_index=build_index,
224225
summarize=summarize,
225226
telemetry=telemetry,
227+
watch_interval=watch_interval,
226228
**kwargs,
227229
)
228230

231+
@property
232+
def _service(self):
233+
return self._client.service
234+
229235
async def wait_processed(self, timeout: float = None) -> Dict[str, Any]:
230236
"""Wait for all queued processing to complete."""
231237
await self._ensure_initialized()

0 commit comments

Comments
 (0)