Skip to content

Testing

Testing utilities for Logfire.

TestExporter

TestExporter()

Bases: SpanExporter

A SpanExporter that stores exported spans in a list for asserting in tests.

Source code in logfire/_internal/exporters/test.py
30
31
def __init__(self) -> None:
    self.exported_spans: list[ReadableSpan] = []

export

Exports a batch of telemetry data.

Source code in logfire/_internal/exporters/test.py
33
34
35
36
def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
    """Exports a batch of telemetry data."""
    self.exported_spans.extend(spans)
    return SpanExportResult.SUCCESS

clear

clear() -> None

Clears the collected spans.

Source code in logfire/_internal/exporters/test.py
38
39
40
def clear(self) -> None:
    """Clears the collected spans."""
    self.exported_spans = []

exported_spans_as_dict

exported_spans_as_dict(
    fixed_line_number: int | None = 123,
    strip_filepaths: bool = True,
    include_resources: bool = False,
    include_instrumentation_scope: bool = False,
    _include_pending_spans: bool = False,
    _strip_function_qualname: bool = True,
) -> list[dict[str, Any]]

The exported spans as a list of dicts.

Parameters:

Name Type Description Default

fixed_line_number

int | None

The line number to use for all spans.

123

strip_filepaths

bool

Whether to strip the filepaths from the exported spans.

True

include_resources

bool

Whether to include the resource attributes in the exported spans.

False

include_instrumentation_scope

bool

Whether to include the instrumentation scope in the exported spans.

False

Returns:

Type Description
list[dict[str, Any]]

A list of dicts representing the exported spans.

Source code in logfire/_internal/exporters/test.py
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def exported_spans_as_dict(
    self,
    fixed_line_number: int | None = 123,
    strip_filepaths: bool = True,
    include_resources: bool = False,
    include_instrumentation_scope: bool = False,
    _include_pending_spans: bool = False,
    _strip_function_qualname: bool = True,
) -> list[dict[str, Any]]:
    """The exported spans as a list of dicts.

    Args:
        fixed_line_number: The line number to use for all spans.
        strip_filepaths: Whether to strip the filepaths from the exported spans.
        include_resources: Whether to include the resource attributes in the exported spans.
        include_instrumentation_scope: Whether to include the instrumentation scope in the exported spans.

    Returns:
        A list of dicts representing the exported spans.
    """
    _build_attributes = partial(
        build_attributes,
        fixed_line_number=fixed_line_number,
        strip_filepaths=strip_filepaths,
        strip_function_qualname=_strip_function_qualname,
    )

    def build_context(context: trace.SpanContext) -> dict[str, Any]:
        return {'trace_id': context.trace_id, 'span_id': context.span_id, 'is_remote': context.is_remote}

    def build_link(link: trace.Link) -> dict[str, Any]:
        context = link.context or trace.INVALID_SPAN_CONTEXT
        return {'context': build_context(context), 'attributes': _build_attributes(link.attributes)}

    def build_event(event: Event) -> dict[str, Any]:
        res: dict[str, Any] = {'name': event.name, 'timestamp': event.timestamp}
        if event.attributes:  # pragma: no branch
            res['attributes'] = attributes = dict(event.attributes)
            if SpanAttributes.EXCEPTION_STACKTRACE in attributes:
                last_line = next(  # pragma: no branch
                    line.strip()
                    for line in reversed(
                        cast(str, event.attributes[SpanAttributes.EXCEPTION_STACKTRACE]).split('\n')
                    )
                    if line.strip()
                )
                attributes[SpanAttributes.EXCEPTION_STACKTRACE] = last_line
        return res

    def build_instrumentation_scope(span: ReadableSpan) -> dict[str, Any]:
        if include_instrumentation_scope:
            return {'instrumentation_scope': span.instrumentation_scope and span.instrumentation_scope.name}
        else:
            return {}

    def build_span(span: ReadableSpan) -> dict[str, Any]:
        context = span.context or trace.INVALID_SPAN_CONTEXT
        res: dict[str, Any] = {
            'name': span.name,
            'context': build_context(context),
            'parent': build_context(span.parent) if span.parent else None,
            'start_time': span.start_time,
            'end_time': span.end_time,
            **build_instrumentation_scope(span),
            'attributes': _build_attributes(span.attributes),
        }
        if span.links:
            res['links'] = [build_link(link) for link in span.links]
        if span.events:
            res['events'] = [build_event(event) for event in span.events]
        if include_resources:
            resource_attributes = _build_attributes(span.resource.attributes)
            res['resource'] = {
                'attributes': resource_attributes,
            }
        return res

    spans = [build_span(span) for span in self.exported_spans]
    return [
        span
        for span in spans
        if _include_pending_spans is True
        or (span.get('attributes', {}).get(ATTRIBUTES_SPAN_TYPE_KEY, 'span') != 'pending_span')
    ]

TestLogExporter

TestLogExporter(ns_timestamp_generator: Callable[[], int])

Bases: InMemoryLogExporter

A LogExporter that stores exported logs in a list for asserting in tests.

Source code in logfire/_internal/exporters/test.py
174
175
176
def __init__(self, ns_timestamp_generator: typing.Callable[[], int]) -> None:
    super().__init__()
    self.ns_timestamp_generator = ns_timestamp_generator

SeededRandomIdGenerator dataclass

SeededRandomIdGenerator(
    seed: int | None = 0,
    _ms_timestamp_generator: Callable[
        [], int
    ] = _default_ms_timestamp_generator,
)

Bases: IdGenerator

Generate random span/trace IDs from a seed for deterministic tests.

Similar to RandomIdGenerator from OpenTelemetry, but with a seed. Set the seed to None for non-deterministic randomness. In that case the difference from RandomIdGenerator is that it's not affected by random.seed(...).

Trace IDs are 128-bit integers. Span IDs are 64-bit integers.

IncrementalIdGenerator dataclass

IncrementalIdGenerator()

Bases: IdGenerator

Generate sequentially incrementing span/trace IDs for testing.

Trace IDs start at 1 and increment by 1 each time. Span IDs start at 1 and increment by 1 each time.

reset_trace_span_ids

reset_trace_span_ids() -> None

Resets the trace and span ids.

Source code in logfire/testing.py
41
42
43
44
def reset_trace_span_ids(self) -> None:  # pragma: no cover
    """Resets the trace and span ids."""
    self.trace_id_counter = 0
    self.span_id_counter = 0

generate_span_id

generate_span_id() -> int

Generates a span id.

Source code in logfire/testing.py
46
47
48
49
50
51
def generate_span_id(self) -> int:
    """Generates a span id."""
    self.span_id_counter += 1
    if self.span_id_counter > 2**64 - 1:  # pragma: no branch
        raise OverflowError('Span ID overflow')  # pragma: no cover
    return self.span_id_counter

generate_trace_id

generate_trace_id() -> int

Generates a trace id.

Source code in logfire/testing.py
53
54
55
56
57
58
def generate_trace_id(self) -> int:
    """Generates a trace id."""
    self.trace_id_counter += 1
    if self.trace_id_counter > 2**128 - 1:  # pragma: no branch
        raise OverflowError('Trace ID overflow')  # pragma: no cover
    return self.trace_id_counter

TimeGenerator

TimeGenerator(ns_time: int = 0)

Generate incrementing timestamps for testing.

Timestamps are in nanoseconds, start at 1_000_000_000, and increment by 1_000_000_000 (1 second) each time.

Source code in logfire/testing.py
68
69
def __init__(self, ns_time: int = 0):
    self.ns_time = ns_time

CaptureLogfire dataclass

CaptureLogfire(
    exporter: TestExporter,
    metrics_reader: InMemoryMetricReader,
    log_exporter: TestLogExporter,
)

A dataclass that holds a span exporter, log exporter, and metric reader.

This is used as the return type of capfire fixture.

exporter instance-attribute

exporter: TestExporter

The span exporter.

metrics_reader instance-attribute

metrics_reader: InMemoryMetricReader

The metric reader.

log_exporter instance-attribute

log_exporter: TestLogExporter

The log exporter.

capfire

capfire() -> CaptureLogfire

A fixture that returns a CaptureLogfire instance.

Source code in logfire/testing.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
@pytest.fixture
def capfire() -> CaptureLogfire:
    """A fixture that returns a CaptureLogfire instance."""
    exporter = TestExporter()
    metrics_reader = InMemoryMetricReader()
    time_generator = TimeGenerator()
    log_exporter = TestLogExporter(time_generator)
    logfire.configure(
        send_to_logfire=False,
        console=False,
        advanced=logfire.AdvancedOptions(
            id_generator=IncrementalIdGenerator(),
            ns_timestamp_generator=time_generator,
            log_record_processors=[SimpleLogRecordProcessor(log_exporter)],
        ),
        additional_span_processors=[SimpleSpanProcessor(exporter)],
        metrics=logfire.MetricsOptions(additional_readers=[InMemoryMetricReader()]),
    )

    return CaptureLogfire(exporter=exporter, metrics_reader=metrics_reader, log_exporter=log_exporter)