Skip to content

Recursive tracing issue in database operations when using opentelemetry-instrumentation-psycopg2 with psycopg2's ThreadedConnectionPool #1925

@changemyminds

Description

@changemyminds

Describe your environment

Language & Library

  • python version: 3.6.15
  • pip version: 21.3.1
  • psycopg2 version: 2.9.7
  • opentelemetry-instrumentation-psycopg2 version: 0.33b0
  • opentelemetry-exporter-otlp version: 1.12.0

Infra (Docker Image)

  • jaegertracing/all-in-one:1.42
  • otel/opentelemetry-collector:0.72.0
  • postgres:13.2-alpine

Steps to reproduce

Prepare

  • otel-collector-config.yaml
receivers:
  otlp:
    protocols:
      grpc:
      http:

exporters:
  logging:
    loglevel: debug

  jaeger:
    endpoint: jaeger-all-in-one:14250
    tls:
      insecure: true
processors:
  batch:

service:
  pipelines:
    traces:
      receivers: [otlp]
      exporters: [logging, jaeger]
      processors: [batch]
  • docker-compose.yaml
version: "3"

services:
  jaeger-all-in-one:
    image: jaegertracing/all-in-one:1.42
    restart: always
    environment:
      - COLLECTOR_OTLP_ENABLED=true
    ports:
      - "16686:16686" # server frontend
      - "14268:14268" # HTTP collector
      - "14250:14250" # gRPC collector

  otel-collector:
    image: otel/opentelemetry-collector:0.72.0
    restart: always
    command: ["--config=/etc/otel-collector-config.yaml"]
    volumes:
      - ./otel-collector-config.yaml:/etc/otel-collector-config.yaml
    ports:
      - "4317:4317"   # OTLP gRPC receiver
      - "4318:4318"   # OTLP Http receiver
    depends_on:
      - jaeger-all-in-one

  # database
  postgres:
    image: postgres:13.2-alpine
    environment:
      - POSTGRES_USER=root
      - POSTGRES_PASSWORD=12345678
      - POSTGRES_DB=example
    ports:
      - "5432:5432"
  • requirements.txt
psycopg2==2.9.7
opentelemetry-instrumentation-psycopg2>=0.33b0
opentelemetry-exporter-otlp>=1.12.0
  • index.py
import logging
import threading
import time
from psycopg2 import pool
import psycopg2
from opentelemetry import trace
from opentelemetry.instrumentation.psycopg2 import Psycopg2Instrumentor
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

# this database use the connection poll
class PostgresDatabasePool:
    def __init__(self, minconn, maxconn, host, database, user, password, port):
        self.db_pool = pool.ThreadedConnectionPool(
            minconn=minconn,
            maxconn=maxconn,
            host=host,
            database=database,
            user=user,
            password=password,
            port=port
        )

    def execute_query(self, query):
        conn = None
        cursor = None
        try:
            conn = self.db_pool.getconn()
            cursor = conn.cursor()
            cursor.execute(query)
            logging.info(f"execute command: {query}")
        except Exception as e:
            logging.error(f"SQL error: {e}")
        finally:
            if cursor:
                cursor.close()
            if conn:
                self.db_pool.putconn(conn)

# this database use the default connect
class PostgresDatabase:
    def __init__(self, host, database, user, password, port):
        self.conn = psycopg2.connect(
            host=host,
            database=database,
            user=user,
            password=password,
            port=port
        )
        self.conn.autocommit = True
        self.cursor = self.conn.cursor()

    def execute_query(self, query):
        if not self.cursor:
            logging.warning("Please connect first")
            return
        try:
            self.cursor.execute(query)
            logging.info(f"execute command: {query}")
        except Exception as e:
            logging.error(f"SQL error: {e}")

def delete1_with_db_pool(thread_name):
    while True:
        with tracer.start_as_current_span("delete1_with_db_pool", kind=trace.SpanKind.INTERNAL):
            dbPool.execute_query("DELETE FROM public.test1;")
            dbPool.execute_query("DELETE FROM public.test1;")
            dbPool.execute_query("DELETE FROM public.test1;")
            dbPool.execute_query("DELETE FROM public.test1;")
            dbPool.execute_query("DELETE FROM public.test1;")
        time.sleep(5)

def delete2_with_db_pool(thread_name):
    while True:
        with tracer.start_as_current_span("delete2_with_db_pool", kind=trace.SpanKind.INTERNAL):
            dbPool.execute_query("DELETE FROM public.test2;")
            dbPool.execute_query("DELETE FROM public.test2;")
            dbPool.execute_query("DELETE FROM public.test2;")
            dbPool.execute_query("DELETE FROM public.test2;")
            dbPool.execute_query("DELETE FROM public.test2;")
        time.sleep(5)

def delete1(thread_name):
    while True:
        with tracer.start_as_current_span("delete1", kind=trace.SpanKind.INTERNAL):
            db.execute_query("DELETE FROM public.test1;")
            db.execute_query("DELETE FROM public.test1;")
            db.execute_query("DELETE FROM public.test1;")
            db.execute_query("DELETE FROM public.test1;")
            db.execute_query("DELETE FROM public.test1;")
        time.sleep(5)

def delete2(thread_name):
    while True:
        with tracer.start_as_current_span("delete2", kind=trace.SpanKind.INTERNAL):
            db.execute_query("DELETE FROM public.test2;")
            db.execute_query("DELETE FROM public.test2;")
            db.execute_query("DELETE FROM public.test2;")
            db.execute_query("DELETE FROM public.test2;")
            db.execute_query("DELETE FROM public.test2;")
        time.sleep(5)

# tracing
resource = Resource(attributes={SERVICE_NAME: 'Demo_Bug'})
provider = TracerProvider(resource=resource)
otlp_exporter = OTLPSpanExporter(endpoint='localhost:4317', insecure=True)
provider.add_span_processor(BatchSpanProcessor(otlp_exporter))
trace.set_tracer_provider(provider)
tracer = trace.get_tracer("test", "1.0.0")

# Psycopg2
Psycopg2Instrumentor().instrument()

# init database
dbPool = PostgresDatabasePool(1, 5, "localhost", "example", "root", "12345678", 5432)
db = PostgresDatabase("localhost", "example", "root", "12345678", 5432)

# create table
db.execute_query("CREATE TABLE IF NOT EXISTS public.test1 (id serial  NOT NULL , text varchar(64)  NOT NULL);")
db.execute_query("CREATE TABLE IF NOT EXISTS public.test2 (id serial  NOT NULL , text varchar(64)  NOT NULL);")

# init thread
threads = [threading.Thread(target=delete1_with_db_pool, args=("Thread-1",)), threading.Thread(target=delete2_with_db_pool, args=("Thread-2",)),
           threading.Thread(target=delete1, args=("Thread-3",)), threading.Thread(target=delete2, args=("Thread-4",))]

# start
for thread in threads:
    thread.start()

# wait
for thread in threads:
    thread.join()

Run command

docker-compose up -d
pip install -r requirements.txt
python index.py

What is the expected behavior?
The result of tracing methods delete1_with_db_pool and delete2_with_db_pool should be the same as delete1 and delete2.

What is the actual behavior?
image
image

Additional context
I suspect the problem lies in our use of the pool.ThreadedConnectionPool() function in the psycopg2 library to reuse the database connection. It seems that opentelemetry-instrumentation-psycopg2 hasn't taken this case.

#355
#381

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions