Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/plugins-jdk17-test.1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ jobs:
- c3p0-0.9.2.x-0.10.x-scenario
- spring-scheduled-6.x-scenario
- caffeine-3.x-scenario
- lettuce-webflux-6x-scenario
steps:
- uses: actions/checkout@v2
with:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/plugins-test.1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ jobs:
- kotlin-coroutine-scenario
- lettuce-scenario
- lettuce-6.5.x-scenario
- lettuce-webflux-5x-scenario
- mongodb-3.x-scenario
- mongodb-4.x-scenario
- netty-socketio-scenario
Expand Down
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Release Notes.
* Add `eclipse-temurin:25-jre` as another base image.
* Add JDK25 plugin tests for Spring 6.
* Ignore classes starting with "sun.nio.cs" in bytebuddy due to potential class loading deadlock.
* Added support for Lettuce reactive Redis commands.

All issues and pull requests are [here](https://github.com/apache/skywalking/milestone/242?closed=1)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.lettuce.core.protocol.RedisCommand;
import org.apache.skywalking.apm.agent.core.conf.Constants;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
Expand Down Expand Up @@ -57,8 +58,10 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr
}
EnhancedInstance enhancedCommand = (EnhancedInstance) spanCarrierCommand;

Object skyWalkingDynamicField = enhancedCommand.getSkyWalkingDynamicField();

// command has been handle by another channel writer (cluster or sentinel case)
if (enhancedCommand.getSkyWalkingDynamicField() != null) {
if (skyWalkingDynamicField instanceof AbstractSpan) {
//set peer in last channel writer (delegate)
if (peer != null) {
AbstractSpan span = (AbstractSpan) enhancedCommand.getSkyWalkingDynamicField();
Expand All @@ -82,6 +85,16 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr
command = "BATCH_WRITE";
}
AbstractSpan span = ContextManager.createExitSpan(operationName, peer);

if (skyWalkingDynamicField instanceof ContextSnapshot) {
ContextSnapshot snapshot = (ContextSnapshot) skyWalkingDynamicField;
if (!ContextManager.isActive()) {
AbstractSpan localSpan = ContextManager.createLocalSpan("RedisReactive/local");
localSpan.setComponent(ComponentsDefine.LETTUCE);
}
ContextManager.continued(snapshot);
}

span.setComponent(ComponentsDefine.LETTUCE);
Tags.CACHE_TYPE.set(span, "Redis");
if (StringUtil.isNotEmpty(key)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.plugin.lettuce.common;

import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;

/**
* Interceptor for RedisSubscription constructor.
* <p>
* This interceptor captures the {@link io.lettuce.core.protocol.RedisCommand} instance
* at subscription construction time and stores it into SkyWalking dynamic field.
* </p>
*/
public class RedisSubscriptionConstructorInterceptor implements InstanceConstructorInterceptor {

@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
// allArguments[1] is the RedisCommand passed to the RedisSubscription constructor
objInst.setSkyWalkingDynamicField(allArguments[1]);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.plugin.lettuce.common;

import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.v2.InstanceMethodsAroundInterceptorV2;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.v2.MethodInvocationContext;
import reactor.core.CoreSubscriber;

import java.lang.reflect.Method;

/**
* Interceptor for {@code RedisPublisher.RedisSubscription#subscribe(Subscriber)} method.
*
* <p>
* This interceptor works together with the constructor interceptor of
* {@code RedisSubscription}:
* </p>
*/
public class RedisSubscriptionSubscribeMethodInterceptor implements InstanceMethodsAroundInterceptorV2 {

@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInvocationContext context) {
if (allArguments[0] instanceof CoreSubscriber) {
CoreSubscriber<?> subscriber = (CoreSubscriber<?>) allArguments[0];
// get ContextSnapshot from reactor context, the snapshot is set to reactor context by any other plugin
// such as DispatcherHandlerHandleMethodInterceptor in spring-webflux-5.x-plugin
Object skywalkingContextSnapshot = subscriber.currentContext().getOrDefault("SKYWALKING_CONTEXT_SNAPSHOT", null);
if (skywalkingContextSnapshot != null) {
((EnhancedInstance) objInst.getSkyWalkingDynamicField()).setSkyWalkingDynamicField(skywalkingContextSnapshot);
}
}
}

@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret, MethodInvocationContext context) {
return ret;
}

@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t, MethodInvocationContext context) {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.plugin.lettuce.common.define;

import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.v2.ClassInstanceMethodsEnhancePluginDefineV2;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.v2.InstanceMethodsInterceptV2Point;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;

import static net.bytebuddy.matcher.ElementMatchers.any;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;

/**
*
*/
public class RedisSubscriptionInstrumentation extends ClassInstanceMethodsEnhancePluginDefineV2 {

private static final String ENHANCE_CLASS = "io.lettuce.core.RedisPublisher$RedisSubscription";

private static final String REDIS_SUBSCRIPTION_SUBSCRIBE_METHOD_INTERCEPTOR = "org.apache.skywalking.apm.plugin.lettuce.common.RedisSubscriptionSubscribeMethodInterceptor";
private static final String REDIS_SUBSCRIPTION_CONST_METHOD_INTERCEPTOR = "org.apache.skywalking.apm.plugin.lettuce.common.RedisSubscriptionConstructorInterceptor";

@Override
public InstanceMethodsInterceptV2Point[] getInstanceMethodsInterceptV2Points() {
return new InstanceMethodsInterceptV2Point[]{
new InstanceMethodsInterceptV2Point() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("subscribe");
}

@Override
public String getMethodsInterceptorV2() {
return REDIS_SUBSCRIPTION_SUBSCRIBE_METHOD_INTERCEPTOR;
}

@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}

@Override
public ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS);
}

@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[] {
new ConstructorInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getConstructorMatcher() {
return any().and(takesArgument(1, named("io.lettuce.core.protocol.RedisCommand")));
}

@Override
public String getConstructorInterceptor() {
return REDIS_SUBSCRIPTION_CONST_METHOD_INTERCEPTOR;
}
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@
# limitations under the License.

lettuce-common=org.apache.skywalking.apm.plugin.lettuce.common.define.DefaultEndpointInstrumentation
lettuce-common=org.apache.skywalking.apm.plugin.lettuce.common.define.RedisCommandInstrumentation
lettuce-common=org.apache.skywalking.apm.plugin.lettuce.common.define.RedisCommandInstrumentation
lettuce-common=org.apache.skywalking.apm.plugin.lettuce.common.define.RedisSubscriptionInstrumentation
21 changes: 21 additions & 0 deletions test/plugin/scenarios/lettuce-webflux-5x-scenario/bin/startup.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

home="$(cd "$(dirname $0)"; pwd)"

java -Dredis.host=${REDIS_SERVERS} -jar -Dskywalking.plugin.lettuce.trace_redis_parameters=true ${agent_opts} ${home}/../libs/lettuce-webflux-5x-scenario.jar &
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
segmentItems:
- serviceName: lettuce-webflux-5x-scenario
segmentSize: nq 0
segments:
- segmentId: not null
spans:
- operationName: /case/healthCheck
parentSpanId: -1
spanId: 0
spanLayer: Http
startTime: not null
endTime: not null
componentId: 67
isError: false
spanType: Entry
peer: ''
skipAnalysis: false
tags:
- {key: url, value: 'http://localhost:8080/case/healthCheck'}
- {key: http.method, value: HEAD}
- {key: http.status_code, value: '200'}
- segmentId: not null
spans:
- operationName: Lettuce/GET
parentSpanId: -1
spanId: 0
spanLayer: Cache
startTime: not null
endTime: not null
componentId: 57
isError: false
spanType: Exit
peer: not null
skipAnalysis: false
tags:
- {key: cache.type, value: Redis}
- {key: cache.key, value: key}
- {key: cache.cmd, value: GET}
- {key: cache.op, value: read}
refs:
- { parentEndpoint: /case/lettuce-case, networkAddress: '', refType: CrossThread,
parentSpanId: 0, parentTraceSegmentId: not null, parentServiceInstance: not
null, parentService: not null, traceId: not null }
- segmentId: not null
spans:
- operationName: Lettuce/SET
parentSpanId: -1
spanId: 0
spanLayer: Cache
startTime: not null
endTime: not null
componentId: 57
isError: false
spanType: Exit
peer: not null
skipAnalysis: false
tags:
- { key: cache.type, value: Redis }
- { key: cache.key, value: key0 }
- { key: cache.cmd, value: SET }
- { key: cache.op, value: write }
refs:
- { parentEndpoint: /case/lettuce-case, networkAddress: '', refType: CrossThread,
parentSpanId: 0, parentTraceSegmentId: not null, parentServiceInstance: not
null, parentService: not null, traceId: not null }
- segmentId: not null
spans:
- operationName: Lettuce/SET
parentSpanId: -1
spanId: 0
spanLayer: Cache
startTime: not null
endTime: not null
componentId: 57
isError: false
spanType: Exit
peer: not null
skipAnalysis: false
tags:
- { key: cache.type, value: Redis }
- { key: cache.key, value: key1 }
- { key: cache.cmd, value: SET }
- { key: cache.op, value: write }
refs:
- { parentEndpoint: /case/lettuce-case, networkAddress: '', refType: CrossThread,
parentSpanId: 0, parentTraceSegmentId: not null, parentServiceInstance: not
null, parentService: not null, traceId: not null }
- segmentId: not null
spans:
- operationName: /case/lettuce-case
parentSpanId: -1
spanId: 0
spanLayer: Http
startTime: not null
endTime: not null
componentId: 67
isError: false
spanType: Entry
peer: ''
skipAnalysis: false
tags:
- {key: url, value: 'http://localhost:8080/case/lettuce-case'}
- {key: http.method, value: GET}
- {key: http.status_code, value: '200'}
Loading
Loading