Skip to content

Testing

Testing utilities for Logfire.

TestExporter

TestExporter()

Bases: SpanExporter

A SpanExporter that stores exported spans in a list for asserting in tests.

Source code in logfire/_internal/exporters/test.py
25
26
def __init__(self) -> None:
    self.exported_spans: list[ReadableSpan] = []

export

Exports a batch of telemetry data.

Source code in logfire/_internal/exporters/test.py
28
29
30
31
def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
    """Exports a batch of telemetry data."""
    self.exported_spans.extend(spans)
    return SpanExportResult.SUCCESS

clear

clear() -> None

Clears the collected spans.

Source code in logfire/_internal/exporters/test.py
33
34
35
def clear(self) -> None:
    """Clears the collected spans."""
    self.exported_spans = []

exported_spans_as_dict

exported_spans_as_dict(
    fixed_line_number: int | None = 123,
    strip_filepaths: bool = True,
    include_resources: bool = False,
    include_package_versions: bool = False,
    include_instrumentation_scope: bool = False,
    _include_pending_spans: bool = False,
    _strip_function_qualname: bool = True,
) -> list[dict[str, Any]]

The exported spans as a list of dicts.

Parameters:

Name Type Description Default

fixed_line_number

int | None

The line number to use for all spans.

123

strip_filepaths

bool

Whether to strip the filepaths from the exported spans.

True

include_resources

bool

Whether to include the resource attributes in the exported spans.

False

include_package_versions

bool

Whether to include the package versions in the exported spans.

False

include_instrumentation_scope

bool

Whether to include the instrumentation scope in the exported spans.

False

Returns:

Type Description
list[dict[str, Any]]

A list of dicts representing the exported spans.

Source code in logfire/_internal/exporters/test.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def exported_spans_as_dict(
    self,
    fixed_line_number: int | None = 123,
    strip_filepaths: bool = True,
    include_resources: bool = False,
    include_package_versions: bool = False,
    include_instrumentation_scope: bool = False,
    _include_pending_spans: bool = False,
    _strip_function_qualname: bool = True,
) -> list[dict[str, Any]]:
    """The exported spans as a list of dicts.

    Args:
        fixed_line_number: The line number to use for all spans.
        strip_filepaths: Whether to strip the filepaths from the exported spans.
        include_resources: Whether to include the resource attributes in the exported spans.
        include_package_versions: Whether to include the package versions in the exported spans.
        include_instrumentation_scope: Whether to include the instrumentation scope in the exported spans.

    Returns:
        A list of dicts representing the exported spans.
    """

    def process_attribute(name: str, value: Any) -> Any:
        if name == 'code.filepath' and strip_filepaths:
            try:
                return Path(value).name
            except ValueError:  # pragma: no cover
                return value
        if name == 'code.lineno' and fixed_line_number is not None:
            return fixed_line_number
        if name == 'code.function':
            if sys.version_info >= (3, 11) and _strip_function_qualname:
                return value.split('.')[-1]
        if name == ResourceAttributes.PROCESS_PID:
            assert value == os.getpid()
            return 1234
        if name == ResourceAttributes.SERVICE_INSTANCE_ID:
            if re.match(r'^[0-9a-f]{32}$', value):
                return '0' * 32
        return value

    def build_attributes(attributes: Mapping[str, Any] | None) -> dict[str, Any] | None:
        if attributes is None:  # pragma: no branch
            return None  # pragma: no cover
        attributes = {
            k: process_attribute(k, v)
            for k, v in attributes.items()
            if k != RESOURCE_ATTRIBUTES_PACKAGE_VERSIONS or include_package_versions
        }
        if 'telemetry.sdk.version' in attributes:
            attributes['telemetry.sdk.version'] = '0.0.0'
        return attributes

    def build_event(event: Event) -> dict[str, Any]:
        res: dict[str, Any] = {
            'name': event.name,
            'timestamp': event.timestamp,
        }
        if event.attributes:  # pragma: no branch
            res['attributes'] = attributes = dict(event.attributes)
            if SpanAttributes.EXCEPTION_STACKTRACE in attributes:
                last_line = next(  # pragma: no branch
                    line.strip()
                    for line in reversed(
                        cast(str, event.attributes[SpanAttributes.EXCEPTION_STACKTRACE]).split('\n')
                    )
                    if line.strip()
                )
                attributes[SpanAttributes.EXCEPTION_STACKTRACE] = last_line
        return res

    def build_instrumentation_scope(span: ReadableSpan) -> dict[str, Any]:
        if include_instrumentation_scope:
            return {'instrumentation_scope': span.instrumentation_scope and span.instrumentation_scope.name}
        else:
            return {}

    def build_span(span: ReadableSpan) -> dict[str, Any]:
        context = span.context or trace.INVALID_SPAN_CONTEXT
        res: dict[str, Any] = {
            'name': span.name,
            'context': {
                'trace_id': context.trace_id,
                'span_id': context.span_id,
                'is_remote': context.is_remote,
            },
            'parent': {
                'trace_id': span.parent.trace_id,
                'span_id': span.parent.span_id,
                'is_remote': span.parent.is_remote,
            }
            if span.parent
            else None,
            'start_time': span.start_time,
            'end_time': span.end_time,
            **build_instrumentation_scope(span),
            'attributes': build_attributes(span.attributes),
        }
        if span.events:
            res['events'] = [build_event(event) for event in span.events]
        if include_resources:
            resource_attributes = build_attributes(span.resource.attributes)
            res['resource'] = {
                'attributes': resource_attributes,
            }
        return res

    spans = [build_span(span) for span in self.exported_spans]
    return [
        span
        for span in spans
        if _include_pending_spans is True
        or (span.get('attributes', {}).get(ATTRIBUTES_SPAN_TYPE_KEY, 'span') != 'pending_span')
    ]

SeededRandomIdGenerator dataclass

SeededRandomIdGenerator(seed: int | None = 0)

Bases: IdGenerator

Generate random span/trace IDs from a seed for deterministic tests.

Similar to RandomIdGenerator from OpenTelemetry, but with a seed. Set the seed to None for non-deterministic randomness. In that case the difference from RandomIdGenerator is that it's not affected by random.seed(...).

Trace IDs are 128-bit integers. Span IDs are 64-bit integers.

IncrementalIdGenerator dataclass

IncrementalIdGenerator()

Bases: IdGenerator

Generate sequentially incrementing span/trace IDs for testing.

Trace IDs start at 1 and increment by 1 each time. Span IDs start at 1 and increment by 1 each time.

reset_trace_span_ids

reset_trace_span_ids() -> None

Resets the trace and span ids.

Source code in logfire/testing.py
39
40
41
42
def reset_trace_span_ids(self) -> None:  # pragma: no cover
    """Resets the trace and span ids."""
    self.trace_id_counter = 0
    self.span_id_counter = 0

generate_span_id

generate_span_id() -> int

Generates a span id.

Source code in logfire/testing.py
44
45
46
47
48
49
def generate_span_id(self) -> int:
    """Generates a span id."""
    self.span_id_counter += 1
    if self.span_id_counter > 2**64 - 1:  # pragma: no branch
        raise OverflowError('Span ID overflow')  # pragma: no cover
    return self.span_id_counter

generate_trace_id

generate_trace_id() -> int

Generates a trace id.

Source code in logfire/testing.py
51
52
53
54
55
56
def generate_trace_id(self) -> int:
    """Generates a trace id."""
    self.trace_id_counter += 1
    if self.trace_id_counter > 2**128 - 1:  # pragma: no branch
        raise OverflowError('Trace ID overflow')  # pragma: no cover
    return self.trace_id_counter

TimeGenerator

TimeGenerator(ns_time: int = 0)

Generate incrementing timestamps for testing.

Timestamps are in nanoseconds, start at 1_000_000_000, and increment by 1_000_000_000 (1 second) each time.

Source code in logfire/testing.py
66
67
def __init__(self, ns_time: int = 0):
    self.ns_time = ns_time

CaptureLogfire dataclass

CaptureLogfire(
    exporter: TestExporter,
    metrics_reader: InMemoryMetricReader,
)

A dataclass that is holds both span exporter and metric renderer.

This is used as the return type of capfire fixture.

exporter instance-attribute

exporter: TestExporter

The TestExporter instance.

metrics_reader instance-attribute

metrics_reader: InMemoryMetricReader

The InMemoryMetricReader instance.

capfire

capfire() -> CaptureLogfire

A fixture that returns a CaptureLogfire instance.

Source code in logfire/testing.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
@pytest.fixture
def capfire() -> CaptureLogfire:
    """A fixture that returns a CaptureLogfire instance."""
    exporter = TestExporter()
    metrics_reader = InMemoryMetricReader()
    logfire.configure(
        send_to_logfire=False,
        console=False,
        advanced=logfire.AdvancedOptions(
            id_generator=IncrementalIdGenerator(),
            ns_timestamp_generator=TimeGenerator(),
        ),
        additional_span_processors=[SimpleSpanProcessor(exporter)],
        metrics=logfire.MetricsOptions(additional_readers=[InMemoryMetricReader()]),
    )

    return CaptureLogfire(exporter=exporter, metrics_reader=metrics_reader)