Skip to content

Logfire

Logfire is the observability tool focused on developer experience.

Logfire

Logfire(
    *,
    config: LogfireConfig = GLOBAL_CONFIG,
    sample_rate: float | None = None,
    tags: Sequence[str] = (),
    console_log: bool = True,
    otel_scope: str = "logfire"
)

The main logfire class.

Source code in logfire/_internal/main.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def __init__(
    self,
    *,
    config: LogfireConfig = GLOBAL_CONFIG,
    sample_rate: float | None = None,
    tags: Sequence[str] = (),
    console_log: bool = True,
    otel_scope: str = 'logfire',
) -> None:
    self._tags = tuple(tags)
    self._config = config
    self._sample_rate = sample_rate
    self._console_log = console_log
    self._otel_scope = otel_scope

trace

trace(
    msg_template: str,
    /,
    *,
    _tags: Sequence[str] | None = None,
    _exc_info: ExcInfo = False,
    **attributes: Any,
) -> None

Log a trace message.

import logfire

logfire.configure()

logfire.trace('This is a trace log')

Parameters:

Name Type Description Default

msg_template

str

The message to log.

required

attributes

Any

The attributes to bind to the log.

{}

_tags

Sequence[str] | None

An optional sequence of tags to include in the log.

None

_exc_info

ExcInfo

Set to an exception or a tuple as returned by sys.exc_info() to record a traceback with the log message.

Set to True to use the currently handled exception.

False
Source code in logfire/_internal/main.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def trace(
    self,
    msg_template: str,
    /,
    *,
    _tags: Sequence[str] | None = None,
    _exc_info: ExcInfo = False,
    **attributes: Any,
) -> None:
    """Log a trace message.

    ```py
    import logfire

    logfire.configure()

    logfire.trace('This is a trace log')
    ```

    Args:
        msg_template: The message to log.
        attributes: The attributes to bind to the log.
        _tags: An optional sequence of tags to include in the log.
        _exc_info: Set to an exception or a tuple as returned by [`sys.exc_info()`][sys.exc_info]
            to record a traceback with the log message.

            Set to `True` to use the currently handled exception.
    """
    if any(k.startswith('_') for k in attributes):
        raise ValueError('Attribute keys cannot start with an underscore.')
    self.log('trace', msg_template, attributes, tags=_tags, exc_info=_exc_info)

debug

debug(
    msg_template: str,
    /,
    *,
    _tags: Sequence[str] | None = None,
    _exc_info: ExcInfo = False,
    **attributes: Any,
) -> None

Log a debug message.

import logfire

logfire.configure()

logfire.debug('This is a debug log')

Parameters:

Name Type Description Default

msg_template

str

The message to log.

required

attributes

Any

The attributes to bind to the log.

{}

_tags

Sequence[str] | None

An optional sequence of tags to include in the log.

None

_exc_info

ExcInfo

Set to an exception or a tuple as returned by sys.exc_info() to record a traceback with the log message.

Set to True to use the currently handled exception.

False
Source code in logfire/_internal/main.py
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
def debug(
    self,
    msg_template: str,
    /,
    *,
    _tags: Sequence[str] | None = None,
    _exc_info: ExcInfo = False,
    **attributes: Any,
) -> None:
    """Log a debug message.

    ```py
    import logfire

    logfire.configure()

    logfire.debug('This is a debug log')
    ```

    Args:
        msg_template: The message to log.
        attributes: The attributes to bind to the log.
        _tags: An optional sequence of tags to include in the log.
        _exc_info: Set to an exception or a tuple as returned by [`sys.exc_info()`][sys.exc_info]
            to record a traceback with the log message.

            Set to `True` to use the currently handled exception.
    """
    if any(k.startswith('_') for k in attributes):
        raise ValueError('Attribute keys cannot start with an underscore.')
    self.log('debug', msg_template, attributes, tags=_tags, exc_info=_exc_info)

info

info(
    msg_template: str,
    /,
    *,
    _tags: Sequence[str] | None = None,
    _exc_info: ExcInfo = False,
    **attributes: Any,
) -> None

Log an info message.

import logfire

logfire.configure()

logfire.info('This is an info log')

Parameters:

Name Type Description Default

msg_template

str

The message to log.

required

attributes

Any

The attributes to bind to the log.

{}

_tags

Sequence[str] | None

An optional sequence of tags to include in the log.

None

_exc_info

ExcInfo

Set to an exception or a tuple as returned by sys.exc_info() to record a traceback with the log message.

Set to True to use the currently handled exception.

False
Source code in logfire/_internal/main.py
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
def info(
    self,
    msg_template: str,
    /,
    *,
    _tags: Sequence[str] | None = None,
    _exc_info: ExcInfo = False,
    **attributes: Any,
) -> None:
    """Log an info message.

    ```py
    import logfire

    logfire.configure()

    logfire.info('This is an info log')
    ```

    Args:
        msg_template: The message to log.
        attributes: The attributes to bind to the log.
        _tags: An optional sequence of tags to include in the log.
        _exc_info: Set to an exception or a tuple as returned by [`sys.exc_info()`][sys.exc_info]
            to record a traceback with the log message.

            Set to `True` to use the currently handled exception.
    """
    if any(k.startswith('_') for k in attributes):
        raise ValueError('Attribute keys cannot start with an underscore.')
    self.log('info', msg_template, attributes, tags=_tags, exc_info=_exc_info)

notice

notice(
    msg_template: str,
    /,
    *,
    _tags: Sequence[str] | None = None,
    _exc_info: ExcInfo = False,
    **attributes: Any,
) -> None

Log a notice message.

import logfire

logfire.configure()

logfire.notice('This is a notice log')

Parameters:

Name Type Description Default

msg_template

str

The message to log.

required

attributes

Any

The attributes to bind to the log.

{}

_tags

Sequence[str] | None

An optional sequence of tags to include in the log.

None

_exc_info

ExcInfo

Set to an exception or a tuple as returned by sys.exc_info() to record a traceback with the log message.

Set to True to use the currently handled exception.

False
Source code in logfire/_internal/main.py
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
def notice(
    self,
    msg_template: str,
    /,
    *,
    _tags: Sequence[str] | None = None,
    _exc_info: ExcInfo = False,
    **attributes: Any,
) -> None:
    """Log a notice message.

    ```py
    import logfire

    logfire.configure()

    logfire.notice('This is a notice log')
    ```

    Args:
        msg_template: The message to log.
        attributes: The attributes to bind to the log.
        _tags: An optional sequence of tags to include in the log.
        _exc_info: Set to an exception or a tuple as returned by [`sys.exc_info()`][sys.exc_info]
            to record a traceback with the log message.

            Set to `True` to use the currently handled exception.
    """
    if any(k.startswith('_') for k in attributes):
        raise ValueError('Attribute keys cannot start with an underscore.')
    self.log('notice', msg_template, attributes, tags=_tags, exc_info=_exc_info)

warn

warn(
    msg_template: str,
    /,
    *,
    _tags: Sequence[str] | None = None,
    _exc_info: ExcInfo = False,
    **attributes: Any,
) -> None

Log a warning message.

import logfire

logfire.configure()

logfire.warn('This is a warning log')

Parameters:

Name Type Description Default

msg_template

str

The message to log.

required

attributes

Any

The attributes to bind to the log.

{}

_tags

Sequence[str] | None

An optional sequence of tags to include in the log.

None

_exc_info

ExcInfo

Set to an exception or a tuple as returned by sys.exc_info() to record a traceback with the log message.

Set to True to use the currently handled exception.

False
Source code in logfire/_internal/main.py
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
def warn(
    self,
    msg_template: str,
    /,
    *,
    _tags: Sequence[str] | None = None,
    _exc_info: ExcInfo = False,
    **attributes: Any,
) -> None:
    """Log a warning message.

    ```py
    import logfire

    logfire.configure()

    logfire.warn('This is a warning log')
    ```

    Args:
        msg_template: The message to log.
        attributes: The attributes to bind to the log.
        _tags: An optional sequence of tags to include in the log.
        _exc_info: Set to an exception or a tuple as returned by [`sys.exc_info()`][sys.exc_info]
            to record a traceback with the log message.

            Set to `True` to use the currently handled exception.
    """
    if any(k.startswith('_') for k in attributes):
        raise ValueError('Attribute keys cannot start with an underscore.')
    self.log('warn', msg_template, attributes, tags=_tags, exc_info=_exc_info)

error

error(
    msg_template: str,
    /,
    *,
    _tags: Sequence[str] | None = None,
    _exc_info: ExcInfo = False,
    **attributes: Any,
) -> None

Log an error message.

import logfire

logfire.configure()

logfire.error('This is an error log')

Parameters:

Name Type Description Default

msg_template

str

The message to log.

required

attributes

Any

The attributes to bind to the log.

{}

_tags

Sequence[str] | None

An optional sequence of tags to include in the log.

None

_exc_info

ExcInfo

Set to an exception or a tuple as returned by sys.exc_info() to record a traceback with the log message.

Set to True to use the currently handled exception.

False
Source code in logfire/_internal/main.py
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
def error(
    self,
    msg_template: str,
    /,
    *,
    _tags: Sequence[str] | None = None,
    _exc_info: ExcInfo = False,
    **attributes: Any,
) -> None:
    """Log an error message.

    ```py
    import logfire

    logfire.configure()

    logfire.error('This is an error log')
    ```

    Args:
        msg_template: The message to log.
        attributes: The attributes to bind to the log.
        _tags: An optional sequence of tags to include in the log.
        _exc_info: Set to an exception or a tuple as returned by [`sys.exc_info()`][sys.exc_info]
            to record a traceback with the log message.

            Set to `True` to use the currently handled exception.
    """
    if any(k.startswith('_') for k in attributes):
        raise ValueError('Attribute keys cannot start with an underscore.')
    self.log('error', msg_template, attributes, tags=_tags, exc_info=_exc_info)

fatal

fatal(
    msg_template: str,
    /,
    *,
    _tags: Sequence[str] | None = None,
    _exc_info: ExcInfo = False,
    **attributes: Any,
) -> None

Log a fatal message.

import logfire

logfire.configure()

logfire.fatal('This is a fatal log')

Parameters:

Name Type Description Default

msg_template

str

The message to log.

required

attributes

Any

The attributes to bind to the log.

{}

_tags

Sequence[str] | None

An optional sequence of tags to include in the log.

None

_exc_info

ExcInfo

Set to an exception or a tuple as returned by sys.exc_info() to record a traceback with the log message.

Set to True to use the currently handled exception.

False
Source code in logfire/_internal/main.py
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
def fatal(
    self,
    msg_template: str,
    /,
    *,
    _tags: Sequence[str] | None = None,
    _exc_info: ExcInfo = False,
    **attributes: Any,
) -> None:
    """Log a fatal message.

    ```py
    import logfire

    logfire.configure()

    logfire.fatal('This is a fatal log')
    ```

    Args:
        msg_template: The message to log.
        attributes: The attributes to bind to the log.
        _tags: An optional sequence of tags to include in the log.
        _exc_info: Set to an exception or a tuple as returned by [`sys.exc_info()`][sys.exc_info]
            to record a traceback with the log message.

            Set to `True` to use the currently handled exception.
    """
    if any(k.startswith('_') for k in attributes):
        raise ValueError('Attribute keys cannot start with an underscore.')
    self.log('fatal', msg_template, attributes, tags=_tags, exc_info=_exc_info)

exception

exception(
    msg_template: str,
    /,
    *,
    _tags: Sequence[str] | None = None,
    _exc_info: ExcInfo = True,
    **attributes: Any,
) -> None

The same as error but with _exc_info=True by default.

This means that a traceback will be logged for any currently handled exception.

Parameters:

Name Type Description Default

msg_template

str

The message to log.

required

attributes

Any

The attributes to bind to the log.

{}

_tags

Sequence[str] | None

An optional sequence of tags to include in the log.

None

_exc_info

ExcInfo

Set to an exception or a tuple as returned by sys.exc_info() to record a traceback with the log message.

True
Source code in logfire/_internal/main.py
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
def exception(
    self,
    msg_template: str,
    /,
    *,
    _tags: Sequence[str] | None = None,
    _exc_info: ExcInfo = True,
    **attributes: Any,
) -> None:
    """The same as `error` but with `_exc_info=True` by default.

    This means that a traceback will be logged for any currently handled exception.

    Args:
        msg_template: The message to log.
        attributes: The attributes to bind to the log.
        _tags: An optional sequence of tags to include in the log.
        _exc_info: Set to an exception or a tuple as returned by [`sys.exc_info()`][sys.exc_info]
            to record a traceback with the log message.
    """
    if any(k.startswith('_') for k in attributes):  # pragma: no cover
        raise ValueError('Attribute keys cannot start with an underscore.')
    self.log('error', msg_template, attributes, tags=_tags, exc_info=_exc_info)

span

span(
    msg_template: str,
    /,
    *,
    _tags: Sequence[str] | None = None,
    _span_name: str | None = None,
    _level: LevelName | None = None,
    **attributes: Any,
) -> LogfireSpan

Context manager for creating a span.

import logfire

logfire.configure()

with logfire.span('This is a span {a=}', a='data'):
    logfire.info('new log 1')

Parameters:

Name Type Description Default

msg_template

str

The template for the span message.

required

_span_name

str | None

The span name. If not provided, the msg_template will be used.

None

_tags

Sequence[str] | None

An optional sequence of tags to include in the span.

None

_level

LevelName | None

An optional log level name.

None

attributes

Any

The arguments to include in the span and format the message template with. Attributes starting with an underscore are not allowed.

{}
Source code in logfire/_internal/main.py
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
def span(
    self,
    msg_template: str,
    /,
    *,
    _tags: Sequence[str] | None = None,
    _span_name: str | None = None,
    _level: LevelName | None = None,
    **attributes: Any,
) -> LogfireSpan:
    """Context manager for creating a span.

    ```py
    import logfire

    logfire.configure()

    with logfire.span('This is a span {a=}', a='data'):
        logfire.info('new log 1')
    ```

    Args:
        msg_template: The template for the span message.
        _span_name: The span name. If not provided, the `msg_template` will be used.
        _tags: An optional sequence of tags to include in the span.
        _level: An optional log level name.
        attributes: The arguments to include in the span and format the message template with.
            Attributes starting with an underscore are not allowed.
    """
    if any(k.startswith('_') for k in attributes):
        raise ValueError('Attribute keys cannot start with an underscore.')
    return self._span(
        msg_template,
        attributes,
        _tags=_tags,
        _span_name=_span_name,
        _level=_level,
    )

instrument

instrument(
    msg_template: LiteralString | None = None,
    *,
    span_name: str | None = None,
    extract_args: bool = True
) -> Callable[[Callable[P, R]], Callable[P, R]]

Decorator for instrumenting a function as a span.

import logfire

logfire.configure()


@logfire.instrument('This is a span {a=}')
def my_function(a: int):
    logfire.info('new log {a=}', a=a)

Note

  • This decorator MUST be applied first, i.e. UNDER any other decorators.
  • The source code of the function MUST be accessible.

Parameters:

Name Type Description Default

msg_template

LiteralString | None

The template for the span message. If not provided, the module and function name will be used.

None

span_name

str | None

The span name. If not provided, the msg_template will be used.

None

extract_args

bool

Whether to extract arguments from the function signature and log them as span attributes.

True
Source code in logfire/_internal/main.py
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
def instrument(
    self,
    msg_template: LiteralString | None = None,
    *,
    span_name: str | None = None,
    extract_args: bool = True,
) -> Callable[[Callable[P, R]], Callable[P, R]]:
    """Decorator for instrumenting a function as a span.

    ```py
    import logfire

    logfire.configure()


    @logfire.instrument('This is a span {a=}')
    def my_function(a: int):
        logfire.info('new log {a=}', a=a)
    ```

    !!! note
        - This decorator MUST be applied first, i.e. UNDER any other decorators.
        - The source code of the function MUST be accessible.

    Args:
        msg_template: The template for the span message. If not provided, the module and function name will be used.
        span_name: The span name. If not provided, the `msg_template` will be used.
        extract_args: Whether to extract arguments from the function signature and log them as span attributes.
    """
    args = LogfireArgs(tuple(self._tags), self._sample_rate, msg_template, span_name, extract_args)
    return instrument(self, args)

log

log(
    level: LevelName | int,
    msg_template: str,
    attributes: dict[str, Any] | None = None,
    tags: Sequence[str] | None = None,
    exc_info: ExcInfo = False,
    console_log: bool | None = None,
) -> None

Log a message.

import logfire

logfire.configure()

logfire.log('info', 'This is a log {a}', {'a': 'Apple'})

Parameters:

Name Type Description Default

level

LevelName | int

The level of the log.

required

msg_template

str

The message to log.

required

attributes

dict[str, Any] | None

The attributes to bind to the log.

None

tags

Sequence[str] | None

An optional sequence of tags to include in the log.

None

exc_info

ExcInfo

Set to an exception or a tuple as returned by sys.exc_info() to record a traceback with the log message.

Set to True to use the currently handled exception.

False

console_log

bool | None

Whether to log to the console, defaults to True.

None
Source code in logfire/_internal/main.py
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
def log(
    self,
    level: LevelName | int,
    msg_template: str,
    attributes: dict[str, Any] | None = None,
    tags: Sequence[str] | None = None,
    exc_info: ExcInfo = False,
    console_log: bool | None = None,
) -> None:
    """Log a message.

    ```py
    import logfire

    logfire.configure()

    logfire.log('info', 'This is a log {a}', {'a': 'Apple'})
    ```

    Args:
        level: The level of the log.
        msg_template: The message to log.
        attributes: The attributes to bind to the log.
        tags: An optional sequence of tags to include in the log.
        exc_info: Set to an exception or a tuple as returned by [`sys.exc_info()`][sys.exc_info]
            to record a traceback with the log message.

            Set to `True` to use the currently handled exception.
        console_log: Whether to log to the console, defaults to `True`.
    """
    with handle_internal_errors():
        stack_info = get_user_stack_info()

        attributes = attributes or {}
        merged_attributes = {**stack_info, **attributes}
        if (msg := attributes.pop(ATTRIBUTES_MESSAGE_KEY, None)) is None:
            fstring_frame = None
            if self._config.inspect_arguments:
                fstring_frame = inspect.currentframe()
                if fstring_frame.f_back.f_code.co_filename == Logfire.log.__code__.co_filename:  # type: ignore
                    # fstring_frame.f_back should be the user's frame.
                    # The user called logfire.info or a similar method rather than calling logfire.log directly.
                    fstring_frame = fstring_frame.f_back  # type: ignore

            msg, extra_attrs, msg_template = logfire_format_with_magic(
                msg_template,
                merged_attributes,
                self._config.scrubber,
                fstring_frame=fstring_frame,
            )
            if extra_attrs:
                merged_attributes.update(extra_attrs)
                # Only do this if extra_attrs is not empty since the copy of `attributes` might be expensive.
                # We update both because attributes_json_schema_properties looks at `attributes`.
                attributes = {**attributes, **extra_attrs}
        else:
            # The message has already been filled in, presumably by a logging integration.
            # Make sure it's a string.
            msg = merged_attributes[ATTRIBUTES_MESSAGE_KEY] = str(msg)
            msg_template = str(msg_template)

        otlp_attributes = user_attributes(merged_attributes)
        otlp_attributes = {
            ATTRIBUTES_SPAN_TYPE_KEY: 'log',
            **log_level_attributes(level),
            ATTRIBUTES_MESSAGE_TEMPLATE_KEY: msg_template,
            ATTRIBUTES_MESSAGE_KEY: msg,
            **otlp_attributes,
        }
        if json_schema_properties := attributes_json_schema_properties(attributes):
            otlp_attributes[ATTRIBUTES_JSON_SCHEMA_KEY] = attributes_json_schema(json_schema_properties)

        tags = self._tags + tuple(tags or ())
        if tags:
            otlp_attributes[ATTRIBUTES_TAGS_KEY] = uniquify_sequence(tags)

        sample_rate = (
            self._sample_rate
            if self._sample_rate is not None
            else otlp_attributes.pop(ATTRIBUTES_SAMPLE_RATE_KEY, None)
        )
        if sample_rate is not None and sample_rate != 1:  # pragma: no cover
            otlp_attributes[ATTRIBUTES_SAMPLE_RATE_KEY] = sample_rate

        if not (self._console_log if console_log is None else console_log):
            otlp_attributes[DISABLE_CONSOLE_KEY] = True
        start_time = self._config.advanced.ns_timestamp_generator()

        span = self._logs_tracer.start_span(
            msg_template,
            attributes=otlp_attributes,
            start_time=start_time,
        )

        if exc_info:
            if exc_info is True:
                exc_info = sys.exc_info()
            if isinstance(exc_info, tuple):
                exc_info = exc_info[1]
            if isinstance(exc_info, BaseException):
                _record_exception(span, exc_info)
                if otlp_attributes[ATTRIBUTES_LOG_LEVEL_NUM_KEY] >= LEVEL_NUMBERS['error']:  # type: ignore
                    # Set the status description to the exception message.
                    # OTEL only lets us set the description when the status code is ERROR,
                    # which we only want to do when the log level is error.
                    _set_exception_status(span, exc_info)
            elif exc_info is not None:  # pragma: no cover
                raise TypeError(f'Invalid type for exc_info: {exc_info.__class__.__name__}')

        span.end(start_time)

with_tags

with_tags(*tags: str) -> Logfire

A new Logfire instance which always uses the given tags.

import logfire

logfire.configure()

local_logfire = logfire.with_tags('tag1')
local_logfire.info('a log message', _tags=['tag2'])

# This is equivalent to:
logfire.info('a log message', _tags=['tag1', 'tag2'])

Parameters:

Name Type Description Default

tags

str

The tags to add.

()

Returns:

Type Description
Logfire

A new Logfire instance with the tags added to any existing tags.

Source code in logfire/_internal/main.py
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
def with_tags(self, *tags: str) -> Logfire:
    """A new Logfire instance which always uses the given tags.

    ```py
    import logfire

    logfire.configure()

    local_logfire = logfire.with_tags('tag1')
    local_logfire.info('a log message', _tags=['tag2'])

    # This is equivalent to:
    logfire.info('a log message', _tags=['tag1', 'tag2'])
    ```

    Args:
        tags: The tags to add.

    Returns:
        A new Logfire instance with the `tags` added to any existing tags.
    """
    return self.with_settings(tags=tags)

with_settings

with_settings(
    *,
    tags: Sequence[str] = (),
    stack_offset: int | None = None,
    console_log: bool | None = None,
    custom_scope_suffix: str | None = None
) -> Logfire

A new Logfire instance which uses the given settings.

Parameters:

Name Type Description Default

tags

Sequence[str]

Sequence of tags to include in the log.

()

stack_offset

int | None

The stack level offset to use when collecting stack info, also affects the warning which message formatting might emit, defaults to 0 which means the stack info will be collected from the position where logfire.log was called.

None

console_log

bool | None

Whether to log to the console, defaults to True.

None

custom_scope_suffix

str | None

A custom suffix to append to logfire. e.g. logfire.loguru.

It should only be used when instrumenting another library with Logfire, such as structlog or loguru.

See the instrumenting_module_name parameter on TracerProvider.get_tracer for more info.

None

Returns:

Type Description
Logfire

A new Logfire instance with the given settings applied.

Source code in logfire/_internal/main.py
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
def with_settings(
    self,
    *,
    tags: Sequence[str] = (),
    stack_offset: int | None = None,
    console_log: bool | None = None,
    custom_scope_suffix: str | None = None,
) -> Logfire:
    """A new Logfire instance which uses the given settings.

    Args:
        tags: Sequence of tags to include in the log.
        stack_offset: The stack level offset to use when collecting stack info, also affects the warning which
            message formatting might emit, defaults to `0` which means the stack info will be collected from the
            position where [`logfire.log`][logfire.Logfire.log] was called.
        console_log: Whether to log to the console, defaults to `True`.
        custom_scope_suffix: A custom suffix to append to `logfire.` e.g. `logfire.loguru`.

            It should only be used when instrumenting another library with Logfire, such as structlog or loguru.

            See the `instrumenting_module_name` parameter on
            [TracerProvider.get_tracer][opentelemetry.sdk.trace.TracerProvider.get_tracer] for more info.

    Returns:
        A new Logfire instance with the given settings applied.
    """
    # TODO add sample_rate once it's more stable
    return Logfire(
        config=self._config,
        tags=self._tags + tuple(tags),
        sample_rate=self._sample_rate,
        console_log=self._console_log if console_log is None else console_log,
        otel_scope=self._otel_scope if custom_scope_suffix is None else f'logfire.{custom_scope_suffix}',
    )

force_flush

force_flush(timeout_millis: int = 3000) -> bool

Force flush all spans and metrics.

Parameters:

Name Type Description Default

timeout_millis

int

The timeout in milliseconds.

3000

Returns:

Type Description
bool

Whether the flush of spans was successful.

Source code in logfire/_internal/main.py
735
736
737
738
739
740
741
742
743
744
def force_flush(self, timeout_millis: int = 3_000) -> bool:  # pragma: no cover
    """Force flush all spans and metrics.

    Args:
        timeout_millis: The timeout in milliseconds.

    Returns:
        Whether the flush of spans was successful.
    """
    return self._config.force_flush(timeout_millis)

log_slow_async_callbacks

log_slow_async_callbacks(
    slow_duration: float = 0.1,
) -> ContextManager[None]

Log a warning whenever a function running in the asyncio event loop blocks for too long.

This works by patching the asyncio.events.Handle._run method.

Parameters:

Name Type Description Default

slow_duration

float

the threshold in seconds for when a callback is considered slow.

0.1

Returns:

Type Description
ContextManager[None]

A context manager that will revert the patch when exited. This context manager doesn't take into account threads or other concurrency. Calling this method will immediately apply the patch without waiting for the context manager to be opened, i.e. it's not necessary to use this as a context manager.

Source code in logfire/_internal/main.py
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
def log_slow_async_callbacks(self, slow_duration: float = 0.1) -> ContextManager[None]:
    """Log a warning whenever a function running in the asyncio event loop blocks for too long.

    This works by patching the `asyncio.events.Handle._run` method.

    Args:
        slow_duration: the threshold in seconds for when a callback is considered slow.

    Returns:
        A context manager that will revert the patch when exited.
            This context manager doesn't take into account threads or other concurrency.
            Calling this method will immediately apply the patch
            without waiting for the context manager to be opened,
            i.e. it's not necessary to use this as a context manager.
    """
    return async_.log_slow_callbacks(self, slow_duration)

install_auto_tracing

install_auto_tracing(
    modules: (
        Sequence[str] | Callable[[AutoTraceModule], bool]
    ),
    *,
    min_duration: float,
    check_imported_modules: Literal[
        "error", "warn", "ignore"
    ] = "error"
) -> None

Install automatic tracing.

See the Auto-Tracing guide for more info.

This will trace all non-generator function calls in the modules specified by the modules argument. It's equivalent to wrapping the body of every function in matching modules in with logfire.span(...):.

Note

This function MUST be called before any of the modules to be traced are imported.

Generator functions will not be traced for reasons explained here.

This works by inserting a new meta path finder into sys.meta_path, so inserting another finder before it may prevent it from working.

It relies on being able to retrieve the source code via at least one other existing finder in the meta path, so it may not work if standard finders are not present or if the source code is not available. A modified version of the source code is then compiled and executed in place of the original module.

Parameters:

Name Type Description Default

modules

Sequence[str] | Callable[[AutoTraceModule], bool]

List of module names to trace, or a function which returns True for modules that should be traced. If a list is provided, any submodules within a given module will also be traced.

required

min_duration

float

A minimum duration in seconds for which a function must run before it's traced. Setting to 0 causes all functions to be traced from the beginning. Otherwise, the first time(s) each function is called, it will be timed but not traced. Only after the function has run for at least min_duration will it be traced in subsequent calls.

required

check_imported_modules

Literal['error', 'warn', 'ignore']

If this is 'error' (the default), then an exception will be raised if any of the modules in sys.modules (i.e. modules that have already been imported) match the modules to trace. Set to 'warn' to issue a warning instead, or 'ignore' to skip the check.

'error'
Source code in logfire/_internal/main.py
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
def install_auto_tracing(
    self,
    modules: Sequence[str] | Callable[[AutoTraceModule], bool],
    *,
    min_duration: float,
    check_imported_modules: Literal['error', 'warn', 'ignore'] = 'error',
) -> None:
    """Install automatic tracing.

    See the [Auto-Tracing guide](https://logfire.pydantic.dev/docs/guides/onboarding_checklist/add_auto_tracing/)
    for more info.

    This will trace all non-generator function calls in the modules specified by the modules argument.
    It's equivalent to wrapping the body of every function in matching modules in `with logfire.span(...):`.

    !!! note
        This function MUST be called before any of the modules to be traced are imported.

        Generator functions will not be traced for reasons explained [here](https://logfire.pydantic.dev/docs/guides/advanced/generators/).

    This works by inserting a new meta path finder into `sys.meta_path`, so inserting another finder before it
    may prevent it from working.

    It relies on being able to retrieve the source code via at least one other existing finder in the meta path,
    so it may not work if standard finders are not present or if the source code is not available.
    A modified version of the source code is then compiled and executed in place of the original module.

    Args:
        modules: List of module names to trace, or a function which returns True for modules that should be traced.
            If a list is provided, any submodules within a given module will also be traced.
        min_duration: A minimum duration in seconds for which a function must run before it's traced.
            Setting to `0` causes all functions to be traced from the beginning.
            Otherwise, the first time(s) each function is called, it will be timed but not traced.
            Only after the function has run for at least `min_duration` will it be traced in subsequent calls.
        check_imported_modules: If this is `'error'` (the default), then an exception will be raised if any of the
            modules in `sys.modules` (i.e. modules that have already been imported) match the modules to trace.
            Set to `'warn'` to issue a warning instead, or `'ignore'` to skip the check.
    """
    install_auto_tracing(self, modules, check_imported_modules=check_imported_modules, min_duration=min_duration)

instrument_pydantic

instrument_pydantic(
    record: PydanticPluginRecordValues = "all",
    include: Iterable[str] = (),
    exclude: Iterable[str] = (),
)

Instrument Pydantic model validations.

This must be called before defining and importing the model classes you want to instrument. See the Pydantic integration guide for more info.

Parameters:

Name Type Description Default

record

PydanticPluginRecordValues

The record mode for the Pydantic plugin. It can be one of the following values:

  • all: Send traces and metrics for all events. This is default value.
  • failure: Send metrics for all validations and traces only for validation failures.
  • metrics: Send only metrics.
  • off: Disable instrumentation.
'all'

include

Iterable[str]

By default, third party modules are not instrumented. This option allows you to include specific modules.

()

exclude

Iterable[str]

Exclude specific modules from instrumentation.

()
Source code in logfire/_internal/main.py
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
def instrument_pydantic(
    self,
    record: PydanticPluginRecordValues = 'all',
    include: Iterable[str] = (),
    exclude: Iterable[str] = (),
):
    """Instrument Pydantic model validations.

    This must be called before defining and importing the model classes you want to instrument.
    See the [Pydantic integration guide](https://logfire.pydantic.dev/docs/integrations/pydantic/) for more info.

    Args:
        record: The record mode for the Pydantic plugin. It can be one of the following values:

            - `all`: Send traces and metrics for all events. This is default value.
            - `failure`: Send metrics for all validations and traces only for validation failures.
            - `metrics`: Send only metrics.
            - `off`: Disable instrumentation.
        include:
            By default, third party modules are not instrumented. This option allows you to include specific modules.
        exclude:
            Exclude specific modules from instrumentation.
    """
    # Note that unlike most instrument_* methods, we intentionally don't call
    # _warn_if_not_initialized_for_instrumentation, because this method needs to be called early.

    if record != 'off':
        import pydantic

        if get_version(pydantic.__version__) < get_version('2.5.0'):  # pragma: no cover
            raise RuntimeError('The Pydantic plugin requires Pydantic 2.5.0 or newer.')

    from logfire.integrations.pydantic import PydanticPlugin, set_pydantic_plugin_config

    if isinstance(include, str):
        include = {include}

    if isinstance(exclude, str):
        exclude = {exclude}

    set_pydantic_plugin_config(
        PydanticPlugin(
            record=record,
            include=set(include),
            exclude=set(exclude),
        )
    )

instrument_fastapi

instrument_fastapi(
    app: FastAPI,
    *,
    capture_headers: bool = False,
    request_attributes_mapper: (
        Callable[
            [Request | WebSocket, dict[str, Any]],
            dict[str, Any] | None,
        ]
        | None
    ) = None,
    use_opentelemetry_instrumentation: bool = True,
    excluded_urls: str | Iterable[str] | None = None,
    record_send_receive: bool = False,
    **opentelemetry_kwargs: Any
) -> ContextManager[None]

Instrument a FastAPI app so that spans and logs are automatically created for each request.

Parameters:

Name Type Description Default

app

FastAPI

The FastAPI app to instrument.

required

capture_headers

bool

Set to True to capture all request and response headers.

False

request_attributes_mapper

Callable[[Request | WebSocket, dict[str, Any]], dict[str, Any] | None] | None

A function that takes a Request or WebSocket and a dictionary of attributes and returns a new dictionary of attributes. The input dictionary will contain:

  • values: A dictionary mapping argument names of the endpoint function to parsed and validated values.
  • errors: A list of validation errors for any invalid inputs.

The returned dictionary will be used as the attributes for a log message. If None is returned, no log message will be created.

You can use this to e.g. only log validation errors, or nothing at all. You can also add custom attributes.

The default implementation will return the input dictionary unchanged. The function mustn't modify the contents of values or errors.

None

excluded_urls

str | Iterable[str] | None

A string of comma-separated regexes which will exclude a request from tracing if the full URL matches any of the regexes. This applies to both the Logfire and OpenTelemetry instrumentation. If not provided, the environment variables OTEL_PYTHON_FASTAPI_EXCLUDED_URLS and OTEL_PYTHON_EXCLUDED_URLS will be checked.

None

use_opentelemetry_instrumentation

bool

If True (the default) then FastAPIInstrumentor will also instrument the app.

See OpenTelemetry FastAPI Instrumentation.

True

record_send_receive

bool

Set to True to allow the OpenTelemetry ASGI to create send/receive spans. These are disabled by default to reduce overhead and the number of spans created, since many can be created for a single request, and they are not often useful. If enabled, they will be set to debug level, meaning they will usually still be hidden in the UI.

False

opentelemetry_kwargs

Any

Additional keyword arguments to pass to the OpenTelemetry FastAPI instrumentation.

{}

Returns:

Type Description
ContextManager[None]

A context manager that will revert the instrumentation when exited. This context manager doesn't take into account threads or other concurrency. Calling this method will immediately apply the instrumentation without waiting for the context manager to be opened, i.e. it's not necessary to use this as a context manager.

Source code in logfire/_internal/main.py
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
def instrument_fastapi(
    self,
    app: FastAPI,
    *,
    capture_headers: bool = False,
    request_attributes_mapper: Callable[
        [
            Request | WebSocket,
            dict[str, Any],
        ],
        dict[str, Any] | None,
    ]
    | None = None,
    use_opentelemetry_instrumentation: bool = True,
    excluded_urls: str | Iterable[str] | None = None,
    record_send_receive: bool = False,
    **opentelemetry_kwargs: Any,
) -> ContextManager[None]:
    """Instrument a FastAPI app so that spans and logs are automatically created for each request.

    Args:
        app: The FastAPI app to instrument.
        capture_headers: Set to `True` to capture all request and response headers.
        request_attributes_mapper: A function that takes a [`Request`][fastapi.Request] or [`WebSocket`][fastapi.WebSocket]
            and a dictionary of attributes and returns a new dictionary of attributes.
            The input dictionary will contain:

            - `values`: A dictionary mapping argument names of the endpoint function to parsed and validated values.
            - `errors`: A list of validation errors for any invalid inputs.

            The returned dictionary will be used as the attributes for a log message.
            If `None` is returned, no log message will be created.

            You can use this to e.g. only log validation errors, or nothing at all.
            You can also add custom attributes.

            The default implementation will return the input dictionary unchanged.
            The function mustn't modify the contents of `values` or `errors`.
        excluded_urls: A string of comma-separated regexes which will exclude a request from tracing if the full URL
            matches any of the regexes. This applies to both the Logfire and OpenTelemetry instrumentation.
            If not provided, the environment variables
            `OTEL_PYTHON_FASTAPI_EXCLUDED_URLS` and `OTEL_PYTHON_EXCLUDED_URLS` will be checked.
        use_opentelemetry_instrumentation: If True (the default) then
            [`FastAPIInstrumentor`][opentelemetry.instrumentation.fastapi.FastAPIInstrumentor]
            will also instrument the app.

            See [OpenTelemetry FastAPI Instrumentation](https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/fastapi/fastapi.html).
        record_send_receive: Set to True to allow the OpenTelemetry ASGI to create send/receive spans.
            These are disabled by default to reduce overhead and the number of spans created,
            since many can be created for a single request, and they are not often useful.
            If enabled, they will be set to debug level, meaning they will usually still be hidden in the UI.
        opentelemetry_kwargs: Additional keyword arguments to pass to the OpenTelemetry FastAPI instrumentation.

    Returns:
        A context manager that will revert the instrumentation when exited.
            This context manager doesn't take into account threads or other concurrency.
            Calling this method will immediately apply the instrumentation
            without waiting for the context manager to be opened,
            i.e. it's not necessary to use this as a context manager.
    """
    from .integrations.fastapi import instrument_fastapi

    self._warn_if_not_initialized_for_instrumentation()
    return instrument_fastapi(
        self,
        app,
        capture_headers=capture_headers,
        request_attributes_mapper=request_attributes_mapper,
        excluded_urls=excluded_urls,
        use_opentelemetry_instrumentation=use_opentelemetry_instrumentation,
        record_send_receive=record_send_receive,
        **opentelemetry_kwargs,
    )

instrument_openai

instrument_openai(
    openai_client: (
        OpenAI
        | AsyncOpenAI
        | type[OpenAI]
        | type[AsyncOpenAI]
        | None
    ) = None,
    *,
    suppress_other_instrumentation: bool = True
) -> ContextManager[None]

Instrument an OpenAI client so that spans are automatically created for each request.

The following methods are instrumented for both the sync and the async clients:

When stream=True a second span is created to instrument the streamed response.

Example usage:

import logfire
import openai

client = openai.OpenAI()
logfire.configure()
logfire.instrument_openai(client)

response = client.chat.completions.create(
    model='gpt-4',
    messages=[
        {'role': 'system', 'content': 'You are a helpful assistant.'},
        {'role': 'user', 'content': 'What is four plus five?'},
    ],
)
print('answer:', response.choices[0].message.content)

Parameters:

Name Type Description Default

openai_client

OpenAI | AsyncOpenAI | type[OpenAI] | type[AsyncOpenAI] | None

The OpenAI client or class to instrument:

  • None (the default) to instrument both the openai.OpenAI and openai.AsyncOpenAI classes.
  • The openai.OpenAI class or a subclass
  • The openai.AsyncOpenAI class or a subclass
  • An instance of openai.OpenAI
  • An instance of openai.AsyncOpenAI
None

suppress_other_instrumentation

bool

If True, suppress any other OTEL instrumentation that may be otherwise enabled. In reality, this means the HTTPX instrumentation, which could otherwise be called since OpenAI uses HTTPX to make HTTP requests.

True

Returns:

Type Description
ContextManager[None]

A context manager that will revert the instrumentation when exited. Use of this context manager is optional.

Source code in logfire/_internal/main.py
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
def instrument_openai(
    self,
    openai_client: openai.OpenAI
    | openai.AsyncOpenAI
    | type[openai.OpenAI]
    | type[openai.AsyncOpenAI]
    | None = None,
    *,
    suppress_other_instrumentation: bool = True,
) -> ContextManager[None]:
    """Instrument an OpenAI client so that spans are automatically created for each request.

    The following methods are instrumented for both the sync and the async clients:

    - [`client.chat.completions.create`](https://platform.openai.com/docs/guides/text-generation/chat-completions-api) — with and without `stream=True`
    - [`client.completions.create`](https://platform.openai.com/docs/guides/text-generation/completions-api) — with and without `stream=True`
    - [`client.embeddings.create`](https://platform.openai.com/docs/guides/embeddings/how-to-get-embeddings)
    - [`client.images.generate`](https://platform.openai.com/docs/guides/images/generations)

    When `stream=True` a second span is created to instrument the streamed response.

    Example usage:

    ```python
    import logfire
    import openai

    client = openai.OpenAI()
    logfire.configure()
    logfire.instrument_openai(client)

    response = client.chat.completions.create(
        model='gpt-4',
        messages=[
            {'role': 'system', 'content': 'You are a helpful assistant.'},
            {'role': 'user', 'content': 'What is four plus five?'},
        ],
    )
    print('answer:', response.choices[0].message.content)
    ```

    Args:
        openai_client: The OpenAI client or class to instrument:

            - `None` (the default) to instrument both the `openai.OpenAI` and `openai.AsyncOpenAI` classes.
            - The `openai.OpenAI` class or a subclass
            - The `openai.AsyncOpenAI` class or a subclass
            - An instance of `openai.OpenAI`
            - An instance of `openai.AsyncOpenAI`

        suppress_other_instrumentation: If True, suppress any other OTEL instrumentation that may be otherwise
            enabled. In reality, this means the HTTPX instrumentation, which could otherwise be called since
            OpenAI uses HTTPX to make HTTP requests.

    Returns:
        A context manager that will revert the instrumentation when exited.
            Use of this context manager is optional.
    """
    import openai

    from .integrations.llm_providers.llm_provider import instrument_llm_provider
    from .integrations.llm_providers.openai import get_endpoint_config, is_async_client, on_response

    self._warn_if_not_initialized_for_instrumentation()
    return instrument_llm_provider(
        self,
        openai_client or (openai.OpenAI, openai.AsyncOpenAI),
        suppress_other_instrumentation,
        'OpenAI',
        get_endpoint_config,
        on_response,
        is_async_client,
    )

instrument_anthropic

instrument_anthropic(
    anthropic_client: (
        Anthropic
        | AsyncAnthropic
        | type[Anthropic]
        | type[AsyncAnthropic]
        | None
    ) = None,
    *,
    suppress_other_instrumentation: bool = True
) -> ContextManager[None]

Instrument an Anthropic client so that spans are automatically created for each request.

The following methods are instrumented for both the sync and the async clients:

When stream=True a second span is created to instrument the streamed response.

Example usage:

import logfire
import anthropic

client = anthropic.Anthropic()
logfire.configure()
logfire.instrument_anthropic(client)

response = client.messages.create(
    model='claude-3-haiku-20240307',
    system='You are a helpful assistant.',
    messages=[
        {'role': 'user', 'content': 'What is four plus five?'},
    ],
)
print('answer:', response.content[0].text)

Parameters:

Name Type Description Default

anthropic_client

Anthropic | AsyncAnthropic | type[Anthropic] | type[AsyncAnthropic] | None

The Anthropic client or class to instrument:

  • None (the default) to instrument both the anthropic.Anthropic and anthropic.AsyncAnthropic classes.
  • The anthropic.Anthropic class or a subclass
  • The anthropic.AsyncAnthropic class or a subclass
  • An instance of anthropic.Anthropic
  • An instance of anthropic.AsyncAnthropic
None

suppress_other_instrumentation

bool

If True, suppress any other OTEL instrumentation that may be otherwise enabled. In reality, this means the HTTPX instrumentation, which could otherwise be called since OpenAI uses HTTPX to make HTTP requests.

True

Returns:

Type Description
ContextManager[None]

A context manager that will revert the instrumentation when exited. Use of this context manager is optional.

Source code in logfire/_internal/main.py
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
def instrument_anthropic(
    self,
    anthropic_client: anthropic.Anthropic
    | anthropic.AsyncAnthropic
    | type[anthropic.Anthropic]
    | type[anthropic.AsyncAnthropic]
    | None = None,
    *,
    suppress_other_instrumentation: bool = True,
) -> ContextManager[None]:
    """Instrument an Anthropic client so that spans are automatically created for each request.

    The following methods are instrumented for both the sync and the async clients:

    - [`client.messages.create`](https://docs.anthropic.com/en/api/messages)
    - [`client.messages.stream`](https://docs.anthropic.com/en/api/messages-streaming)
    - [`client.beta.tools.messages.create`](https://docs.anthropic.com/en/docs/tool-use)

    When `stream=True` a second span is created to instrument the streamed response.

    Example usage:

    ```python
    import logfire
    import anthropic

    client = anthropic.Anthropic()
    logfire.configure()
    logfire.instrument_anthropic(client)

    response = client.messages.create(
        model='claude-3-haiku-20240307',
        system='You are a helpful assistant.',
        messages=[
            {'role': 'user', 'content': 'What is four plus five?'},
        ],
    )
    print('answer:', response.content[0].text)
    ```

    Args:
        anthropic_client: The Anthropic client or class to instrument:

            - `None` (the default) to instrument both the
                `anthropic.Anthropic` and `anthropic.AsyncAnthropic` classes.
            - The `anthropic.Anthropic` class or a subclass
            - The `anthropic.AsyncAnthropic` class or a subclass
            - An instance of `anthropic.Anthropic`
            - An instance of `anthropic.AsyncAnthropic`

        suppress_other_instrumentation: If True, suppress any other OTEL instrumentation that may be otherwise
            enabled. In reality, this means the HTTPX instrumentation, which could otherwise be called since
            OpenAI uses HTTPX to make HTTP requests.

    Returns:
        A context manager that will revert the instrumentation when exited.
            Use of this context manager is optional.
    """
    import anthropic

    from .integrations.llm_providers.anthropic import get_endpoint_config, is_async_client, on_response
    from .integrations.llm_providers.llm_provider import instrument_llm_provider

    self._warn_if_not_initialized_for_instrumentation()
    return instrument_llm_provider(
        self,
        anthropic_client or (anthropic.Anthropic, anthropic.AsyncAnthropic),
        suppress_other_instrumentation,
        'Anthropic',
        get_endpoint_config,
        on_response,
        is_async_client,
    )

instrument_asyncpg

instrument_asyncpg(
    **kwargs: Unpack[AsyncPGInstrumentKwargs],
) -> None

Instrument the asyncpg module so that spans are automatically created for each query.

Source code in logfire/_internal/main.py
1076
1077
1078
1079
1080
1081
def instrument_asyncpg(self, **kwargs: Unpack[AsyncPGInstrumentKwargs]) -> None:
    """Instrument the `asyncpg` module so that spans are automatically created for each query."""
    from .integrations.asyncpg import instrument_asyncpg

    self._warn_if_not_initialized_for_instrumentation()
    return instrument_asyncpg(**kwargs)

instrument_httpx

instrument_httpx(
    **kwargs: Unpack[HTTPXInstrumentKwargs],
) -> None

Instrument the httpx module so that spans are automatically created for each request.

Uses the OpenTelemetry HTTPX Instrumentation library, specifically HTTPXClientInstrumentor().instrument(), to which it passes **kwargs.

Source code in logfire/_internal/main.py
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
def instrument_httpx(self, **kwargs: Unpack[HTTPXInstrumentKwargs]) -> None:
    """Instrument the `httpx` module so that spans are automatically created for each request.

    Uses the
    [OpenTelemetry HTTPX Instrumentation](https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/httpx/httpx.html)
    library, specifically `HTTPXClientInstrumentor().instrument()`, to which it passes `**kwargs`.
    """
    from .integrations.httpx import instrument_httpx

    self._warn_if_not_initialized_for_instrumentation()
    return instrument_httpx(**kwargs)

instrument_celery

instrument_celery(
    **kwargs: Unpack[CeleryInstrumentKwargs],
) -> None

Instrument celery so that spans are automatically created for each task.

Uses the OpenTelemetry Celery Instrumentation library.

Source code in logfire/_internal/main.py
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
def instrument_celery(self, **kwargs: Unpack[CeleryInstrumentKwargs]) -> None:
    """Instrument `celery` so that spans are automatically created for each task.

    Uses the
    [OpenTelemetry Celery Instrumentation](https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/celery/celery.html)
    library.
    """
    from .integrations.celery import instrument_celery

    self._warn_if_not_initialized_for_instrumentation()
    return instrument_celery(**kwargs)

instrument_django

instrument_django(
    capture_headers: bool = False,
    is_sql_commentor_enabled: bool | None = None,
    request_hook: (
        Callable[[Span, HttpRequest], None] | None
    ) = None,
    response_hook: (
        Callable[[Span, HttpRequest, HttpResponse], None]
        | None
    ) = None,
    excluded_urls: str | None = None,
    **kwargs: Any,
) -> None

Instrument django so that spans are automatically created for each web request.

Uses the OpenTelemetry Django Instrumentation library.

Parameters:

Name Type Description Default

capture_headers

bool

Set to True to capture all request and response headers.

False

is_sql_commentor_enabled

bool | None

Adds comments to SQL queries performed by Django, so that database logs have additional context.

This does NOT create spans/logs for the queries themselves. For that you need to instrument the database driver, e.g. with logfire.instrument_psycopg().

To configure the SQL Commentor, see the OpenTelemetry documentation for the values that need to be added to settings.py.

None

request_hook

Callable[[Span, HttpRequest], None] | None

A function called right after a span is created for a request. The function should accept two arguments: the span and the Django Request object.

None

response_hook

Callable[[Span, HttpRequest, HttpResponse], None] | None

A function called right before a span is finished for the response. The function should accept three arguments: the span, the Django Request object, and the Django Response object.

None

excluded_urls

str | None

A string containing a comma-delimited list of regexes used to exclude URLs from tracking.

None

**kwargs

Any

Additional keyword arguments to pass to the OpenTelemetry instrument method, for future compatibility.

{}
Source code in logfire/_internal/main.py
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
def instrument_django(
    self,
    capture_headers: bool = False,
    is_sql_commentor_enabled: bool | None = None,
    request_hook: Callable[[Span, HttpRequest], None] | None = None,
    response_hook: Callable[[Span, HttpRequest, HttpResponse], None] | None = None,
    excluded_urls: str | None = None,
    **kwargs: Any,
) -> None:
    """Instrument `django` so that spans are automatically created for each web request.

    Uses the
    [OpenTelemetry Django Instrumentation](https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/django/django.html)
    library.

    Args:
        capture_headers: Set to `True` to capture all request and response headers.
        is_sql_commentor_enabled: Adds comments to SQL queries performed by Django,
            so that database logs have additional context.

            This does NOT create spans/logs for the queries themselves.
            For that you need to instrument the database driver, e.g. with `logfire.instrument_psycopg()`.

            To configure the SQL Commentor, see the OpenTelemetry documentation for the
            values that need to be added to `settings.py`.

        request_hook: A function called right after a span is created for a request.
            The function should accept two arguments: the span and the Django `Request` object.

        response_hook: A function called right before a span is finished for the response.
            The function should accept three arguments:
            the span, the Django `Request` object, and the Django `Response` object.

        excluded_urls: A string containing a comma-delimited list of regexes used to exclude URLs from tracking.

        **kwargs: Additional keyword arguments to pass to the OpenTelemetry `instrument` method,
            for future compatibility.

    """
    from .integrations.django import instrument_django

    self._warn_if_not_initialized_for_instrumentation()
    return instrument_django(
        capture_headers=capture_headers,
        is_sql_commentor_enabled=is_sql_commentor_enabled,
        request_hook=request_hook,
        response_hook=response_hook,
        excluded_urls=excluded_urls,
        **kwargs,
    )

instrument_requests

instrument_requests(
    excluded_urls: str | None = None, **kwargs: Any
) -> None

Instrument the requests module so that spans are automatically created for each request.

Parameters:

Name Type Description Default

excluded_urls

str | None

A string containing a comma-delimited list of regexes used to exclude URLs from tracking

None

**kwargs

Any

Additional keyword arguments to pass to the OpenTelemetry instrument methods, particularly request_hook and response_hook.

{}
Source code in logfire/_internal/main.py
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
def instrument_requests(self, excluded_urls: str | None = None, **kwargs: Any) -> None:
    """Instrument the `requests` module so that spans are automatically created for each request.

    Args:
        excluded_urls: A string containing a comma-delimited list of regexes used to exclude URLs from tracking
        **kwargs: Additional keyword arguments to pass to the OpenTelemetry `instrument` methods,
            particularly `request_hook` and `response_hook`.
    """
    from .integrations.requests import instrument_requests

    self._warn_if_not_initialized_for_instrumentation()
    return instrument_requests(excluded_urls=excluded_urls, **kwargs)

instrument_psycopg

instrument_psycopg(
    conn_or_module: Any = None,
    **kwargs: Unpack[PsycopgInstrumentKwargs],
) -> None

Instrument a psycopg connection or module so that spans are automatically created for each query.

Uses the OpenTelemetry instrumentation libraries for psycopg and psycopg2.

Parameters:

Name Type Description Default

conn_or_module

Any

Can be:

  • The psycopg (version 3) or psycopg2 module.
  • The string 'psycopg' or 'psycopg2' to instrument the module.
  • None (the default) to instrument whichever module(s) are installed.
  • A psycopg or psycopg2 connection.
None

**kwargs

Unpack[PsycopgInstrumentKwargs]

Additional keyword arguments to pass to the OpenTelemetry instrument methods, particularly enable_commenter and commenter_options.

{}
Source code in logfire/_internal/main.py
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
def instrument_psycopg(self, conn_or_module: Any = None, **kwargs: Unpack[PsycopgInstrumentKwargs]) -> None:
    """Instrument a `psycopg` connection or module so that spans are automatically created for each query.

    Uses the OpenTelemetry instrumentation libraries for
    [`psycopg`](https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/psycopg/psycopg.html)
    and
    [`psycopg2`](https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/psycopg2/psycopg2.html).

    Args:
        conn_or_module: Can be:

            - The `psycopg` (version 3) or `psycopg2` module.
            - The string `'psycopg'` or `'psycopg2'` to instrument the module.
            - `None` (the default) to instrument whichever module(s) are installed.
            - A `psycopg` or `psycopg2` connection.

        **kwargs: Additional keyword arguments to pass to the OpenTelemetry `instrument` methods,
            particularly `enable_commenter` and `commenter_options`.
    """
    from .integrations.psycopg import instrument_psycopg

    self._warn_if_not_initialized_for_instrumentation()
    return instrument_psycopg(conn_or_module, **kwargs)

instrument_flask

instrument_flask(
    app: Flask,
    *,
    capture_headers: bool = False,
    **kwargs: Unpack[FlaskInstrumentKwargs]
) -> None

Instrument app so that spans are automatically created for each request.

Set capture_headers to True to capture all request and response headers.

Uses the OpenTelemetry Flask Instrumentation library, specifically FlaskInstrumentor().instrument_app(), to which it passes **kwargs.

Source code in logfire/_internal/main.py
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
def instrument_flask(
    self, app: Flask, *, capture_headers: bool = False, **kwargs: Unpack[FlaskInstrumentKwargs]
) -> None:
    """Instrument `app` so that spans are automatically created for each request.

    Set `capture_headers` to `True` to capture all request and response headers.

    Uses the
    [OpenTelemetry Flask Instrumentation](https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/flask/flask.html)
    library, specifically `FlaskInstrumentor().instrument_app()`, to which it passes `**kwargs`.
    """
    from .integrations.flask import instrument_flask

    self._warn_if_not_initialized_for_instrumentation()
    return instrument_flask(app, capture_headers=capture_headers, **kwargs)

instrument_starlette

instrument_starlette(
    app: Starlette,
    *,
    capture_headers: bool = False,
    record_send_receive: bool = False,
    **kwargs: Unpack[StarletteInstrumentKwargs]
) -> None

Instrument app so that spans are automatically created for each request.

Set capture_headers to True to capture all request and response headers.

Set record_send_receive to True to allow the OpenTelemetry ASGI to create send/receive spans. These are disabled by default to reduce overhead and the number of spans created, since many can be created for a single request, and they are not often useful. If enabled, they will be set to debug level, meaning they will usually still be hidden in the UI.

Uses the OpenTelemetry Starlette Instrumentation library, specifically StarletteInstrumentor.instrument_app(), to which it passes **kwargs.

Source code in logfire/_internal/main.py
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
def instrument_starlette(
    self,
    app: Starlette,
    *,
    capture_headers: bool = False,
    record_send_receive: bool = False,
    **kwargs: Unpack[StarletteInstrumentKwargs],
) -> None:
    """Instrument `app` so that spans are automatically created for each request.

    Set `capture_headers` to `True` to capture all request and response headers.

    Set `record_send_receive` to `True` to allow the OpenTelemetry ASGI to create send/receive spans.
    These are disabled by default to reduce overhead and the number of spans created,
    since many can be created for a single request, and they are not often useful.
    If enabled, they will be set to debug level, meaning they will usually still be hidden in the UI.

    Uses the
    [OpenTelemetry Starlette Instrumentation](https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/starlette/starlette.html)
    library, specifically `StarletteInstrumentor.instrument_app()`, to which it passes `**kwargs`.
    """
    from .integrations.starlette import instrument_starlette

    self._warn_if_not_initialized_for_instrumentation()
    return instrument_starlette(
        self,
        app,
        record_send_receive=record_send_receive,
        capture_headers=capture_headers,
        **kwargs,
    )

instrument_aiohttp_client

instrument_aiohttp_client(**kwargs: Any) -> None

Instrument the aiohttp module so that spans are automatically created for each client request.

Uses the OpenTelemetry aiohttp client Instrumentation library, specifically AioHttpClientInstrumentor().instrument(), to which it passes **kwargs.

Source code in logfire/_internal/main.py
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
def instrument_aiohttp_client(self, **kwargs: Any) -> None:
    """Instrument the `aiohttp` module so that spans are automatically created for each client request.

    Uses the
    [OpenTelemetry aiohttp client Instrumentation](https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/aiohttp_client/aiohttp_client.html)
    library, specifically `AioHttpClientInstrumentor().instrument()`, to which it passes `**kwargs`.
    """
    from .integrations.aiohttp_client import instrument_aiohttp_client

    self._warn_if_not_initialized_for_instrumentation()
    return instrument_aiohttp_client(**kwargs)

instrument_sqlalchemy

instrument_sqlalchemy(
    **kwargs: Unpack[SQLAlchemyInstrumentKwargs],
) -> None

Instrument the sqlalchemy module so that spans are automatically created for each query.

Uses the OpenTelemetry SQLAlchemy Instrumentation library, specifically SQLAlchemyInstrumentor().instrument(), to which it passes **kwargs.

Source code in logfire/_internal/main.py
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
def instrument_sqlalchemy(self, **kwargs: Unpack[SQLAlchemyInstrumentKwargs]) -> None:
    """Instrument the `sqlalchemy` module so that spans are automatically created for each query.

    Uses the
    [OpenTelemetry SQLAlchemy Instrumentation](https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/sqlalchemy/sqlalchemy.html)
    library, specifically `SQLAlchemyInstrumentor().instrument()`, to which it passes `**kwargs`.
    """
    from .integrations.sqlalchemy import instrument_sqlalchemy

    self._warn_if_not_initialized_for_instrumentation()
    return instrument_sqlalchemy(**kwargs)

instrument_pymongo

instrument_pymongo(
    **kwargs: Unpack[PymongoInstrumentKwargs],
) -> None

Instrument the pymongo module so that spans are automatically created for each operation.

Uses the OpenTelemetry pymongo Instrumentation library, specifically PymongoInstrumentor().instrument(), to which it passes **kwargs.

Source code in logfire/_internal/main.py
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
def instrument_pymongo(self, **kwargs: Unpack[PymongoInstrumentKwargs]) -> None:
    """Instrument the `pymongo` module so that spans are automatically created for each operation.

    Uses the
    [OpenTelemetry pymongo Instrumentation](https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/pymongo/pymongo.html)
        library, specifically `PymongoInstrumentor().instrument()`, to which it passes `**kwargs`.
    """
    from .integrations.pymongo import instrument_pymongo

    self._warn_if_not_initialized_for_instrumentation()
    return instrument_pymongo(**kwargs)

instrument_redis

instrument_redis(
    capture_statement: bool = False,
    **kwargs: Unpack[RedisInstrumentKwargs],
) -> None

Instrument the redis module so that spans are automatically created for each operation.

Uses the OpenTelemetry Redis Instrumentation library, specifically RedisInstrumentor().instrument(), to which it passes **kwargs.

Parameters:

Name Type Description Default

capture_statement

bool

Set to True to capture the statement in the span attributes.

False

kwargs

Unpack[RedisInstrumentKwargs]

Additional keyword arguments to pass to the OpenTelemetry instrument methods.

{}
Source code in logfire/_internal/main.py
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
def instrument_redis(self, capture_statement: bool = False, **kwargs: Unpack[RedisInstrumentKwargs]) -> None:
    """Instrument the `redis` module so that spans are automatically created for each operation.

    Uses the
    [OpenTelemetry Redis Instrumentation](https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/redis/redis.html)
    library, specifically `RedisInstrumentor().instrument()`, to which it passes `**kwargs`.

    Args:
        capture_statement: Set to `True` to capture the statement in the span attributes.
        kwargs: Additional keyword arguments to pass to the OpenTelemetry `instrument` methods.
    """
    from .integrations.redis import instrument_redis

    self._warn_if_not_initialized_for_instrumentation()
    return instrument_redis(capture_statement=capture_statement, **kwargs)

instrument_mysql

instrument_mysql(
    conn: MySQLConnection = None,
    **kwargs: Unpack[MySQLInstrumentKwargs],
) -> MySQLConnection

Instrument the mysql module or a specific MySQL connection so that spans are automatically created for each operation.

Uses the OpenTelemetry MySQL Instrumentation library.

Parameters:

Name Type Description Default

conn

MySQLConnection

The mysql connection to instrument, or None to instrument all connections.

None

**kwargs

Unpack[MySQLInstrumentKwargs]

Additional keyword arguments to pass to the OpenTelemetry instrument methods.

{}

Returns:

Type Description
MySQLConnection

If a connection is provided, returns the instrumented connection. If no connection is provided, returns None.

Source code in logfire/_internal/main.py
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
def instrument_mysql(
    self,
    conn: MySQLConnection = None,
    **kwargs: Unpack[MySQLInstrumentKwargs],
) -> MySQLConnection:
    """Instrument the `mysql` module or a specific MySQL connection so that spans are automatically created for each operation.

    Uses the
    [OpenTelemetry MySQL Instrumentation](https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/mysql/mysql.html)
    library.

    Args:
        conn: The `mysql` connection to instrument, or `None` to instrument all connections.
        **kwargs: Additional keyword arguments to pass to the OpenTelemetry `instrument` methods.

    Returns:
        If a connection is provided, returns the instrumented connection. If no connection is provided, returns None.

    """
    from .integrations.mysql import instrument_mysql

    self._warn_if_not_initialized_for_instrumentation()
    return instrument_mysql(conn, **kwargs)

instrument_system_metrics

instrument_system_metrics(
    config: Config | None = None, base: Base = "basic"
) -> None

Collect system metrics.

See the guide for more information.

Parameters:

Name Type Description Default

config

Config | None

A dictionary where the keys are metric names and the values are optional further configuration for that metric.

None

base

Base

A string indicating the base config dictionary which config will be merged with, or None for an empty base config.

'basic'
Source code in logfire/_internal/main.py
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
def instrument_system_metrics(
    self, config: SystemMetricsConfig | None = None, base: SystemMetricsBase = 'basic'
) -> None:
    """Collect system metrics.

    See [the guide](https://logfire.pydantic.dev/docs/integrations/system-metrics/) for more information.

    Args:
        config: A dictionary where the keys are metric names
            and the values are optional further configuration for that metric.
        base: A string indicating the base config dictionary which `config` will be merged with,
            or `None` for an empty base config.
    """
    from .integrations.system_metrics import instrument_system_metrics

    self._warn_if_not_initialized_for_instrumentation()
    return instrument_system_metrics(self, config, base)

metric_counter

metric_counter(
    name: str, *, unit: str = "", description: str = ""
) -> Counter

Create a counter metric.

A counter is a cumulative metric that represents a single numerical value that only ever goes up.

import logfire

logfire.configure()
counter = logfire.metric_counter('exceptions', unit='1', description='Number of exceptions caught')

try:
    raise Exception('oops')
except Exception:
    counter.add(1)

See the Opentelemetry documentation about counters.

Parameters:

Name Type Description Default

name

str

The name of the metric.

required

unit

str

The unit of the metric.

''

description

str

The description of the metric.

''

Returns:

Type Description
Counter

The counter metric.

Source code in logfire/_internal/main.py
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
def metric_counter(self, name: str, *, unit: str = '', description: str = '') -> Counter:
    """Create a counter metric.

    A counter is a cumulative metric that represents a single numerical value that only ever goes up.

    ```py
    import logfire

    logfire.configure()
    counter = logfire.metric_counter('exceptions', unit='1', description='Number of exceptions caught')

    try:
        raise Exception('oops')
    except Exception:
        counter.add(1)
    ```

    See the [Opentelemetry documentation](https://opentelemetry.io/docs/specs/otel/metrics/api/#counter) about
    counters.

    Args:
        name: The name of the metric.
        unit: The unit of the metric.
        description: The description of the metric.

    Returns:
        The counter metric.
    """
    return self._config.meter.create_counter(name, unit, description)

metric_histogram

metric_histogram(
    name: str, *, unit: str = "", description: str = ""
) -> Histogram

Create a histogram metric.

A histogram is a metric that samples observations (usually things like request durations or response sizes).

import logfire

logfire.configure()
histogram = logfire.metric_histogram('bank.amount_transferred', unit='$', description='Amount transferred')


def transfer(amount: int):
    histogram.record(amount)

See the Opentelemetry documentation about

Parameters:

Name Type Description Default

name

str

The name of the metric.

required

unit

str

The unit of the metric.

''

description

str

The description of the metric.

''

Returns:

Type Description
Histogram

The histogram metric.

Source code in logfire/_internal/main.py
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
def metric_histogram(self, name: str, *, unit: str = '', description: str = '') -> Histogram:
    """Create a histogram metric.

    A histogram is a metric that samples observations (usually things like request durations or response sizes).

    ```py
    import logfire

    logfire.configure()
    histogram = logfire.metric_histogram('bank.amount_transferred', unit='$', description='Amount transferred')


    def transfer(amount: int):
        histogram.record(amount)
    ```

    See the [Opentelemetry documentation](https://opentelemetry.io/docs/specs/otel/metrics/api/#histogram) about

    Args:
        name: The name of the metric.
        unit: The unit of the metric.
        description: The description of the metric.

    Returns:
        The histogram metric.
    """
    return self._config.meter.create_histogram(name, unit, description)

metric_gauge

metric_gauge(
    name: str, *, unit: str = "", description: str = ""
) -> _Gauge

Create a gauge metric.

Gauge is a synchronous instrument which can be used to record non-additive measurements.

import logfire

logfire.configure()
gauge = logfire.metric_gauge('system.cpu_usage', unit='%', description='CPU usage')


def update_cpu_usage(cpu_percent):
    gauge.set(cpu_percent)

See the Opentelemetry documentation about gauges.

Parameters:

Name Type Description Default

name

str

The name of the metric.

required

unit

str

The unit of the metric.

''

description

str

The description of the metric.

''

Returns:

Type Description
_Gauge

The gauge metric.

Source code in logfire/_internal/main.py
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
def metric_gauge(self, name: str, *, unit: str = '', description: str = '') -> Gauge:
    """Create a gauge metric.

    Gauge is a synchronous instrument which can be used to record non-additive measurements.

    ```py
    import logfire

    logfire.configure()
    gauge = logfire.metric_gauge('system.cpu_usage', unit='%', description='CPU usage')


    def update_cpu_usage(cpu_percent):
        gauge.set(cpu_percent)
    ```

    See the [Opentelemetry documentation](https://opentelemetry.io/docs/specs/otel/metrics/api/#gauge) about gauges.

    Args:
        name: The name of the metric.
        unit: The unit of the metric.
        description: The description of the metric.

    Returns:
        The gauge metric.
    """
    return self._config.meter.create_gauge(name, unit, description)

metric_up_down_counter

metric_up_down_counter(
    name: str, *, unit: str = "", description: str = ""
) -> UpDownCounter

Create an up-down counter metric.

An up-down counter is a cumulative metric that represents a single numerical value that can be adjusted up or down.

import logfire

logfire.configure()
up_down_counter = logfire.metric_up_down_counter('users.logged_in', unit='1', description='Users logged in')


def on_login(user):
    up_down_counter.add(1)


def on_logout(user):
    up_down_counter.add(-1)

See the Opentelemetry documentation about up-down counters.

Parameters:

Name Type Description Default

name

str

The name of the metric.

required

unit

str

The unit of the metric.

''

description

str

The description of the metric.

''

Returns:

Type Description
UpDownCounter

The up-down counter metric.

Source code in logfire/_internal/main.py
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
def metric_up_down_counter(self, name: str, *, unit: str = '', description: str = '') -> UpDownCounter:
    """Create an up-down counter metric.

    An up-down counter is a cumulative metric that represents a single numerical value that can be adjusted up or
    down.

    ```py
    import logfire

    logfire.configure()
    up_down_counter = logfire.metric_up_down_counter('users.logged_in', unit='1', description='Users logged in')


    def on_login(user):
        up_down_counter.add(1)


    def on_logout(user):
        up_down_counter.add(-1)
    ```

    See the [Opentelemetry documentation](https://opentelemetry.io/docs/specs/otel/metrics/api/#updowncounter) about
    up-down counters.

    Args:
        name: The name of the metric.
        unit: The unit of the metric.
        description: The description of the metric.

    Returns:
        The up-down counter metric.
    """
    return self._config.meter.create_up_down_counter(name, unit, description)

metric_counter_callback

metric_counter_callback(
    name: str,
    *,
    callbacks: Sequence[CallbackT],
    unit: str = "",
    description: str = ""
) -> None

Create a counter metric that uses a callback to collect observations.

The counter metric is a cumulative metric that represents a single numerical value that only ever goes up.

import logfire
import psutil
from opentelemetry.metrics import CallbackOptions, Observation

logfire.configure()


def cpu_usage_callback(options: CallbackOptions):
    cpu_percents = psutil.cpu_percent(percpu=True)

    for i, cpu_percent in enumerate(cpu_percents):
        yield Observation(cpu_percent, {'cpu': i})


cpu_usage_counter = logfire.metric_counter_callback(
    'system.cpu.usage',
    callbacks=[cpu_usage_callback],
    unit='%',
    description='CPU usage',
)

See the Opentelemetry documentation about asynchronous counter.

Parameters:

Name Type Description Default

name

str

The name of the metric.

required

callbacks

Sequence[CallbackT]

A sequence of callbacks that return an iterable of Observation.

required

unit

str

The unit of the metric.

''

description

str

The description of the metric.

''
Source code in logfire/_internal/main.py
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
def metric_counter_callback(
    self,
    name: str,
    *,
    callbacks: Sequence[CallbackT],
    unit: str = '',
    description: str = '',
) -> None:
    """Create a counter metric that uses a callback to collect observations.

    The counter metric is a cumulative metric that represents a single numerical value that only ever goes up.

    ```py
    import logfire
    import psutil
    from opentelemetry.metrics import CallbackOptions, Observation

    logfire.configure()


    def cpu_usage_callback(options: CallbackOptions):
        cpu_percents = psutil.cpu_percent(percpu=True)

        for i, cpu_percent in enumerate(cpu_percents):
            yield Observation(cpu_percent, {'cpu': i})


    cpu_usage_counter = logfire.metric_counter_callback(
        'system.cpu.usage',
        callbacks=[cpu_usage_callback],
        unit='%',
        description='CPU usage',
    )
    ```

    See the [Opentelemetry documentation](https://opentelemetry.io/docs/specs/otel/metrics/api/#asynchronous-counter)
    about asynchronous counter.

    Args:
        name: The name of the metric.
        callbacks: A sequence of callbacks that return an iterable of
            [Observation](https://opentelemetry-python.readthedocs.io/en/latest/api/metrics.html#opentelemetry.metrics.Observation).
        unit: The unit of the metric.
        description: The description of the metric.
    """
    self._config.meter.create_observable_counter(name, callbacks, unit, description)

metric_gauge_callback

metric_gauge_callback(
    name: str,
    callbacks: Sequence[CallbackT],
    *,
    unit: str = "",
    description: str = ""
) -> None

Create a gauge metric that uses a callback to collect observations.

The gauge metric is a metric that represents a single numerical value that can arbitrarily go up and down.

import threading

import logfire
from opentelemetry.metrics import CallbackOptions, Observation

logfire.configure()


def thread_count_callback(options: CallbackOptions):
    yield Observation(threading.active_count())


logfire.metric_gauge_callback(
    'system.thread_count',
    callbacks=[thread_count_callback],
    unit='1',
    description='Number of threads',
)

See the Opentelemetry documentation about asynchronous gauge.

Parameters:

Name Type Description Default

name

str

The name of the metric.

required

callbacks

Sequence[CallbackT]

A sequence of callbacks that return an iterable of Observation.

required

unit

str

The unit of the metric.

''

description

str

The description of the metric.

''
Source code in logfire/_internal/main.py
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
def metric_gauge_callback(
    self, name: str, callbacks: Sequence[CallbackT], *, unit: str = '', description: str = ''
) -> None:
    """Create a gauge metric that uses a callback to collect observations.

    The gauge metric is a metric that represents a single numerical value that can arbitrarily go up and down.

    ```py
    import threading

    import logfire
    from opentelemetry.metrics import CallbackOptions, Observation

    logfire.configure()


    def thread_count_callback(options: CallbackOptions):
        yield Observation(threading.active_count())


    logfire.metric_gauge_callback(
        'system.thread_count',
        callbacks=[thread_count_callback],
        unit='1',
        description='Number of threads',
    )
    ```

    See the [Opentelemetry documentation](https://opentelemetry.io/docs/specs/otel/metrics/api/#asynchronous-gauge)
    about asynchronous gauge.

    Args:
        name: The name of the metric.
        callbacks: A sequence of callbacks that return an iterable of
            [Observation](https://opentelemetry-python.readthedocs.io/en/latest/api/metrics.html#opentelemetry.metrics.Observation).
        unit: The unit of the metric.
        description: The description of the metric.
    """
    self._config.meter.create_observable_gauge(name, callbacks, unit, description)

metric_up_down_counter_callback

metric_up_down_counter_callback(
    name: str,
    callbacks: Sequence[CallbackT],
    *,
    unit: str = "",
    description: str = ""
) -> None

Create an up-down counter metric that uses a callback to collect observations.

The up-down counter is a cumulative metric that represents a single numerical value that can be adjusted up or down.

import logfire
from opentelemetry.metrics import CallbackOptions, Observation

logfire.configure()

items = []


def inventory_callback(options: CallbackOptions):
    yield Observation(len(items))


logfire.metric_up_down_counter_callback(
    name='store.inventory',
    description='Number of items in the inventory',
    callbacks=[inventory_callback],
)

See the Opentelemetry documentation about asynchronous up-down counters.

Parameters:

Name Type Description Default

name

str

The name of the metric.

required

callbacks

Sequence[CallbackT]

A sequence of callbacks that return an iterable of Observation.

required

unit

str

The unit of the metric.

''

description

str

The description of the metric.

''
Source code in logfire/_internal/main.py
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
def metric_up_down_counter_callback(
    self, name: str, callbacks: Sequence[CallbackT], *, unit: str = '', description: str = ''
) -> None:
    """Create an up-down counter metric that uses a callback to collect observations.

    The up-down counter is a cumulative metric that represents a single numerical value that can be adjusted up or
    down.

    ```py
    import logfire
    from opentelemetry.metrics import CallbackOptions, Observation

    logfire.configure()

    items = []


    def inventory_callback(options: CallbackOptions):
        yield Observation(len(items))


    logfire.metric_up_down_counter_callback(
        name='store.inventory',
        description='Number of items in the inventory',
        callbacks=[inventory_callback],
    )
    ```

    See the [Opentelemetry documentation](https://opentelemetry.io/docs/specs/otel/metrics/api/#asynchronous-updowncounter)
    about asynchronous up-down counters.

    Args:
        name: The name of the metric.
        callbacks: A sequence of callbacks that return an iterable of
            [Observation](https://opentelemetry-python.readthedocs.io/en/latest/api/metrics.html#opentelemetry.metrics.Observation).
        unit: The unit of the metric.
        description: The description of the metric.
    """
    self._config.meter.create_observable_up_down_counter(name, callbacks, unit, description)

shutdown

shutdown(
    timeout_millis: int = 30000, flush: bool = True
) -> bool

Shut down all tracers and meters.

This will clean up any resources used by the tracers and meters and flush any remaining spans and metrics.

Parameters:

Name Type Description Default

timeout_millis

int

The timeout in milliseconds.

30000

flush

bool

Whether to flush remaining spans and metrics before shutting down.

True

Returns:

Type Description
bool

False if the timeout was reached before the shutdown was completed, True otherwise.

Source code in logfire/_internal/main.py
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
def shutdown(self, timeout_millis: int = 30_000, flush: bool = True) -> bool:  # pragma: no cover
    """Shut down all tracers and meters.

    This will clean up any resources used by the tracers and meters and flush any remaining spans and metrics.

    Args:
        timeout_millis: The timeout in milliseconds.
        flush: Whether to flush remaining spans and metrics before shutting down.

    Returns:
        `False` if the timeout was reached before the shutdown was completed, `True` otherwise.
    """
    start = time()
    if flush:  # pragma: no branch
        self._tracer_provider.force_flush(timeout_millis)
    remaining = max(0, timeout_millis - (time() - start))
    if not remaining:  # pragma: no cover
        return False
    self._tracer_provider.shutdown()

    remaining = max(0, timeout_millis - (time() - start))
    if not remaining:  # pragma: no cover
        return False
    if flush:  # pragma: no branch
        self._meter_provider.force_flush(remaining)
    remaining = max(0, timeout_millis - (time() - start))
    if not remaining:  # pragma: no cover
        return False
    self._meter_provider.shutdown(remaining)
    return (start - time()) < timeout_millis

Logfire is the observability tool focused on developer experience.

LevelName module-attribute

LevelName = Literal[
    "trace",
    "debug",
    "info",
    "notice",
    "warn",
    "warning",
    "error",
    "fatal",
]

Level names for records.

SamplingOptions dataclass

SamplingOptions(
    head: float | Sampler = 1.0,
    tail: (
        Callable[[TailSamplingSpanInfo], float] | None
    ) = None,
)

Options for logfire.configure(sampling=...).

See the sampling guide.

head class-attribute instance-attribute

head: float | Sampler = 1.0

Head sampling options.

If it's a float, it should be a number between 0.0 and 1.0. This is the probability that an entire trace will randomly included.

Alternatively you can pass a custom OpenTelemetry Sampler.

tail class-attribute instance-attribute

tail: Callable[[TailSamplingSpanInfo], float] | None = None

An optional tail sampling callback which will be called for every span.

It should return a number between 0.0 and 1.0, the probability that the entire trace will be included. Use SamplingOptions.level_or_duration for a common use case.

Every span in a trace will be stored in memory until either the trace is included by tail sampling or it's completed and discarded, so large traces may consume a lot of memory.

level_or_duration classmethod

level_or_duration(
    *,
    head: float | Sampler = 1.0,
    level_threshold: LevelName | None = "notice",
    duration_threshold: float | None = 5.0,
    background_rate: float = 0.0
) -> Self

Returns a SamplingOptions instance that tail samples traces based on their log level and duration.

If a trace has at least one span/log that has a log level greater than or equal to level_threshold, or if the duration of the whole trace is greater than duration_threshold seconds, then the whole trace will be included. Otherwise, the probability is background_rate.

The head parameter is the same as in the SamplingOptions constructor.

Source code in logfire/sampling/_tail_sampling.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
@classmethod
def level_or_duration(
    cls,
    *,
    head: float | Sampler = 1.0,
    level_threshold: LevelName | None = 'notice',
    duration_threshold: float | None = 5.0,
    background_rate: float = 0.0,
) -> Self:
    """Returns a `SamplingOptions` instance that tail samples traces based on their log level and duration.

    If a trace has at least one span/log that has a log level greater than or equal to `level_threshold`,
    or if the duration of the whole trace is greater than `duration_threshold` seconds,
    then the whole trace will be included.
    Otherwise, the probability is `background_rate`.

    The `head` parameter is the same as in the `SamplingOptions` constructor.
    """
    head_sample_rate = head if isinstance(head, (float, int)) else 1.0

    if not (0.0 <= background_rate <= head_sample_rate <= 1.0):
        raise ValueError('Invalid sampling rates, must be 0.0 <= background_rate <= head <= 1.0')

    def get_tail_sample_rate(span_info: TailSamplingSpanInfo) -> float:
        if duration_threshold is not None and span_info.duration > duration_threshold:
            return 1.0

        if level_threshold is not None and span_info.level >= level_threshold:
            return 1.0

        return background_rate

    return cls(head=head, tail=get_tail_sample_rate)

AutoTraceModule dataclass

AutoTraceModule(name: str, filename: str | None)

Information about a module being imported that should maybe be traced automatically.

This object will be passed to a function that should return True if the module should be traced. In particular it'll be passed to a function that's passed to install_auto_tracing as the modules argument.

name instance-attribute

name: str

Fully qualified absolute name of the module being imported.

filename instance-attribute

filename: str | None

Filename of the module being imported.

parts_start_with

parts_start_with(prefix: str | Sequence[str]) -> bool

Return True if the module name starts with any of the given prefixes, using dots as boundaries.

For example, if the module name is foo.bar.spam, then parts_start_with('foo') will return True, but parts_start_with('bar') or parts_start_with('foo_bar') will return False. In other words, this will match the module itself or any submodules.

If a prefix contains any characters other than letters, numbers, and dots, then it will be treated as a regular expression.

Source code in logfire/_internal/auto_trace/types.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def parts_start_with(self, prefix: str | Sequence[str]) -> bool:
    """Return True if the module name starts with any of the given prefixes, using dots as boundaries.

    For example, if the module name is `foo.bar.spam`, then `parts_start_with('foo')` will return True,
    but `parts_start_with('bar')` or `parts_start_with('foo_bar')` will return False.
    In other words, this will match the module itself or any submodules.

    If a prefix contains any characters other than letters, numbers, and dots,
    then it will be treated as a regular expression.
    """
    if isinstance(prefix, str):
        prefix = (prefix,)
    pattern = '|'.join([get_module_pattern(p) for p in prefix])
    return bool(re.match(pattern, self.name))

AdvancedOptions dataclass

AdvancedOptions(
    base_url: str = "https://logfire-api.pydantic.dev",
    id_generator: IdGenerator = lambda: SeededRandomIdGenerator(
        None
    )(),
    ns_timestamp_generator: Callable[[], int] = time_ns,
)

Options primarily used for testing by Logfire developers.

base_url class-attribute instance-attribute

base_url: str = 'https://logfire-api.pydantic.dev'

Root URL for the Logfire API.

id_generator class-attribute instance-attribute

id_generator: IdGenerator = field(
    default_factory=lambda: SeededRandomIdGenerator(None)
)

Generator for trace and span IDs.

The default generates random IDs and is unaffected by calls to random.seed().

ns_timestamp_generator class-attribute instance-attribute

ns_timestamp_generator: Callable[[], int] = time_ns

Generator for nanosecond start and end timestamps of spans.

ConsoleOptions dataclass

ConsoleOptions(
    colors: ConsoleColorsValues = "auto",
    span_style: Literal[
        "simple", "indented", "show-parents"
    ] = "show-parents",
    include_timestamps: bool = True,
    verbose: bool = False,
    min_log_level: LevelName = "info",
    show_project_link: bool = True,
)

Options for controlling console output.

span_style class-attribute instance-attribute

span_style: Literal[
    "simple", "indented", "show-parents"
] = "show-parents"

How spans are shown in the console.

include_timestamps class-attribute instance-attribute

include_timestamps: bool = True

Whether to include timestamps in the console output.

verbose class-attribute instance-attribute

verbose: bool = False

Whether to show verbose output.

It includes the filename, log level, and line number.

min_log_level class-attribute instance-attribute

min_log_level: LevelName = 'info'

The minimum log level to show in the console.

show_project_link: bool = True

Whether to print the URL of the Logfire project after initialization.

MetricsOptions dataclass

MetricsOptions(
    additional_readers: Sequence[MetricReader] = (),
)

Configuration of metrics.

This only has one option for now, but it's a place to add more related options in the future.

additional_readers class-attribute instance-attribute

additional_readers: Sequence[MetricReader] = ()

Sequence of metric readers to be used in addition to the default which exports metrics to Logfire's API.

PydanticPlugin dataclass

PydanticPlugin(
    record: PydanticPluginRecordValues = "off",
    include: set[str] = set(),
    exclude: set[str] = set(),
)

Options for the Pydantic plugin.

This class is deprecated for external use. Use logfire.instrument_pydantic() instead.

record class-attribute instance-attribute

record: PydanticPluginRecordValues = 'off'

The record mode for the Pydantic plugin.

It can be one of the following values:

  • off: Disable instrumentation. This is default value.
  • all: Send traces and metrics for all events.
  • failure: Send metrics for all validations and traces only for validation failures.
  • metrics: Send only metrics.

include class-attribute instance-attribute

include: set[str] = field(default_factory=set)

By default, third party modules are not instrumented. This option allows you to include specific modules.

exclude class-attribute instance-attribute

exclude: set[str] = field(default_factory=set)

Exclude specific modules from instrumentation.

LogfireSpan

LogfireSpan(
    span_name: str,
    otlp_attributes: dict[str, AttributeValue],
    tracer: Tracer,
    json_schema_properties: JsonSchemaProperties,
)

Bases: ReadableSpan

Source code in logfire/_internal/main.py
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
def __init__(
    self,
    span_name: str,
    otlp_attributes: dict[str, otel_types.AttributeValue],
    tracer: Tracer,
    json_schema_properties: JsonSchemaProperties,
) -> None:
    self._span_name = span_name
    self._otlp_attributes = otlp_attributes
    self._tracer = tracer
    self._json_schema_properties = json_schema_properties

    self._added_attributes = False
    self._end_on_exit: bool | None = None
    self._token: None | object = None
    self._span: None | trace_api.Span = None
    self.end_on_exit = True

end

end() -> None

Sets the current time as the span's end time.

The span's end time is the wall time at which the operation finished.

Only the first call to this method is recorded, further calls are ignored so you can call this within the span's context manager to end it before the context manager exits.

Source code in logfire/_internal/main.py
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
def end(self) -> None:
    """Sets the current time as the span's end time.

    The span's end time is the wall time at which the operation finished.

    Only the first call to this method is recorded, further calls are ignored so you
    can call this within the span's context manager to end it before the context manager
    exits.
    """
    if self._span is None:  # pragma: no cover
        raise RuntimeError('Span has not been started')
    if self._span.is_recording():
        with handle_internal_errors():
            if self._added_attributes:
                self._span.set_attribute(
                    ATTRIBUTES_JSON_SCHEMA_KEY, attributes_json_schema(self._json_schema_properties)
                )

            self._span.end()

set_attribute

set_attribute(key: str, value: Any) -> None

Sets an attribute on the span.

Parameters:

Name Type Description Default

key

str

The key of the attribute.

required

value

Any

The value of the attribute.

required
Source code in logfire/_internal/main.py
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
@handle_internal_errors()
def set_attribute(self, key: str, value: Any) -> None:
    """Sets an attribute on the span.

    Args:
        key: The key of the attribute.
        value: The value of the attribute.
    """
    self._added_attributes = True
    self._json_schema_properties[key] = create_json_schema(value, set())
    key, otel_value = set_user_attribute(self._otlp_attributes, key, value)
    if self._span is not None:  # pragma: no branch
        self._span.set_attribute(key, otel_value)

set_attributes

set_attributes(
    attributes: dict[str, AttributeValue]
) -> None

Sets the given attributes on the span.

Source code in logfire/_internal/main.py
1748
1749
1750
1751
def set_attributes(self, attributes: dict[str, otel_types.AttributeValue]) -> None:
    """Sets the given attributes on the span."""
    for key, value in attributes.items():
        self.set_attribute(key, value)

record_exception

record_exception(
    exception: BaseException,
    attributes: Attributes = None,
    timestamp: int | None = None,
    escaped: bool = False,
) -> None

Records an exception as a span event.

Delegates to the OpenTelemetry SDK Span.record_exception method.

Source code in logfire/_internal/main.py
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
def record_exception(
    self,
    exception: BaseException,
    attributes: otel_types.Attributes = None,
    timestamp: int | None = None,
    escaped: bool = False,
) -> None:  # pragma: no cover
    """Records an exception as a span event.

    Delegates to the OpenTelemetry SDK `Span.record_exception` method.
    """
    if self._span is None:
        raise RuntimeError('Span has not been started')

    # Check if the span has been sampled out first, since _record_exception is somewhat expensive.
    if not self._span.is_recording():
        return

    _record_exception(
        self._span,
        exception,
        attributes=attributes,
        timestamp=timestamp,
        escaped=escaped,
    )

set_level

set_level(level: LevelName | int)

Set the log level of this span.

Source code in logfire/_internal/main.py
1783
1784
1785
1786
1787
1788
1789
1790
@handle_internal_errors()
def set_level(self, level: LevelName | int):
    """Set the log level of this span."""
    attributes = log_level_attributes(level)
    if self._span is None:
        self._otlp_attributes.update(attributes)
    else:
        self._span.set_attributes(attributes)

ScrubbingOptions dataclass

ScrubbingOptions(
    callback: ScrubCallback | None = None,
    extra_patterns: Sequence[str] | None = None,
)

Options for redacting sensitive data.

callback class-attribute instance-attribute

callback: ScrubCallback | None = None

A function that is called for each match found by the scrubber. If it returns None, the value is redacted. Otherwise, the returned value replaces the matched value. The function accepts a single argument of type logfire.ScrubMatch.

extra_patterns class-attribute instance-attribute

extra_patterns: Sequence[str] | None = None

A sequence of regular expressions to detect sensitive data that should be redacted. For example, the default includes 'password', 'secret', and 'api[._ -]?key'. The specified patterns are combined with the default patterns.

ScrubMatch dataclass

ScrubMatch(
    path: JsonPath, value: Any, pattern_match: Match[str]
)

An object passed to a ScrubbingOptions.callback function.

path instance-attribute

path: JsonPath

The path to the value in the span being considered for redaction, e.g. ('attributes', 'password').

value instance-attribute

value: Any

The value in the span being considered for redaction, e.g. 'my_password'.

pattern_match instance-attribute

pattern_match: Match[str]

The regex match object indicating why the value is being redacted. Use pattern_match.group(0) to get the matched string.

LogfireLoggingHandler

LogfireLoggingHandler(
    level: int | str = NOTSET,
    fallback: Handler = StreamHandler(),
    logfire_instance: Logfire | None = None,
)

Bases: Handler

A logging handler that sends logs to Logfire.

Source code in logfire/integrations/logging.py
57
58
59
60
61
62
63
64
65
66
67
def __init__(
    self,
    level: int | str = NOTSET,
    fallback: LoggingHandler = StreamHandler(),
    logfire_instance: Logfire | None = None,
) -> None:
    super().__init__(level=level)
    self.fallback = fallback
    self.logfire_instance = (logfire_instance or logfire.DEFAULT_LOGFIRE_INSTANCE).with_settings(
        custom_scope_suffix=self.custom_scope_suffix
    )

emit

emit(record: LogRecord) -> None

Send the log to Logfire.

Parameters:

Name Type Description Default

record

LogRecord

The log record to send.

required
Source code in logfire/integrations/logging.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def emit(self, record: LogRecord) -> None:
    """Send the log to Logfire.

    Args:
        record: The log record to send.
    """
    if is_instrumentation_suppressed():
        self.fallback.handle(record)
        return

    attributes = self.fill_attributes(record)

    self.logfire_instance.log(
        msg_template=attributes.pop(ATTRIBUTES_MESSAGE_TEMPLATE_KEY, record.msg),
        level=LOGGING_TO_OTEL_LEVEL_NUMBERS.get(record.levelno, record.levelno),
        attributes=attributes,
        exc_info=record.exc_info,
    )

fill_attributes

fill_attributes(record: LogRecord) -> dict[str, Any]

Fill the attributes to send to Logfire.

This method can be overridden to add more attributes.

Parameters:

Name Type Description Default

record

LogRecord

The log record.

required

Returns:

Type Description
dict[str, Any]

The attributes for the log record.

Source code in logfire/integrations/logging.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def fill_attributes(self, record: LogRecord) -> dict[str, Any]:
    """Fill the attributes to send to Logfire.

    This method can be overridden to add more attributes.

    Args:
        record: The log record.

    Returns:
        The attributes for the log record.
    """
    attributes = {k: v for k, v in record.__dict__.items() if k not in RESERVED_ATTRS}
    attributes['code.filepath'] = record.pathname
    attributes['code.lineno'] = record.lineno
    attributes['code.function'] = record.funcName

    attributes[ATTRIBUTES_MESSAGE_KEY], args = _format_message(record)
    attributes.update(args)

    return attributes

StructlogProcessor

StructlogProcessor(
    *,
    console_log: bool = False,
    logfire_instance: Logfire | None = None
)

Logfire processor for structlog.

Source code in logfire/integrations/structlog.py
26
27
28
29
30
31
32
33
34
35
def __init__(
    self,
    *,
    console_log: bool = False,
    logfire_instance: Logfire | None = None,
) -> None:
    self.console_log = console_log
    self.logfire_instance = (logfire_instance or logfire.DEFAULT_LOGFIRE_INSTANCE).with_settings(
        custom_scope_suffix='structlog'
    )

__call__

__call__(
    logger: WrappedLogger, name: str, event_dict: EventDict
) -> EventDict

A middleware to process structlog event, and send it to Logfire.

Source code in logfire/integrations/structlog.py
37
38
39
40
41
42
43
44
45
46
47
48
49
def __call__(self, logger: WrappedLogger, name: str, event_dict: EventDict) -> EventDict:
    """A middleware to process structlog event, and send it to **Logfire**."""
    attributes = {k: v for k, v in event_dict.items() if k not in RESERVED_ATTRS}
    level = event_dict.get('level', 'info').lower()
    # NOTE: An event can be `None` in structlog. We may want to create a default msg in those cases.
    attributes[ATTRIBUTES_MESSAGE_KEY] = message = event_dict.get('event') or 'structlog event'
    self.logfire_instance.log(
        level=level,  # type: ignore
        msg_template=message,
        attributes=attributes,
        console_log=self.console_log,
    )
    return event_dict

no_auto_trace

no_auto_trace(x: T) -> T

Decorator to prevent a function/class from being traced by logfire.install_auto_tracing.

This is useful for small functions that are called very frequently and would generate too much noise.

The decorator is detected at import time. Only @no_auto_trace or @logfire.no_auto_trace are supported. Renaming/aliasing either the function or module won't work. Neither will calling this indirectly via another function.

Any decorated function, or any function defined anywhere inside a decorated function/class, will be completely ignored by logfire.install_auto_tracing.

This decorator simply returns the argument unchanged, so there is zero runtime overhead.

Source code in logfire/_internal/auto_trace/rewrite_ast.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
def no_auto_trace(x: T) -> T:
    """Decorator to prevent a function/class from being traced by `logfire.install_auto_tracing`.

    This is useful for small functions that are called very frequently and would generate too much noise.

    The decorator is detected at import time.
    Only `@no_auto_trace` or `@logfire.no_auto_trace` are supported.
    Renaming/aliasing either the function or module won't work.
    Neither will calling this indirectly via another function.

    Any decorated function, or any function defined anywhere inside a decorated function/class,
    will be completely ignored by `logfire.install_auto_tracing`.

    This decorator simply returns the argument unchanged, so there is zero runtime overhead.
    """
    return x  # pragma: no cover

configure

configure(
    *,
    send_to_logfire: (
        bool | Literal["if-token-present"] | None
    ) = None,
    token: str | None = None,
    service_name: str | None = None,
    service_version: str | None = None,
    console: ConsoleOptions | Literal[False] | None = None,
    config_dir: Path | str | None = None,
    data_dir: Path | str | None = None,
    additional_span_processors: (
        Sequence[SpanProcessor] | None
    ) = None,
    metrics: MetricsOptions | Literal[False] | None = None,
    scrubbing: (
        ScrubbingOptions | Literal[False] | None
    ) = None,
    inspect_arguments: bool | None = None,
    sampling: SamplingOptions | None = None,
    advanced: AdvancedOptions | None = None,
    **deprecated_kwargs: Unpack[DeprecatedKwargs]
) -> None

Configure the logfire SDK.

Parameters:

Name Type Description Default

send_to_logfire

bool | Literal['if-token-present'] | None

Whether to send logs to logfire.dev. Defaults to the LOGFIRE_SEND_TO_LOGFIRE environment variable if set, otherwise defaults to True. If if-token-present is provided, logs will only be sent if a token is present.

None

token

str | None

The project token. Defaults to the LOGFIRE_TOKEN environment variable.

None

service_name

str | None

Name of this service. Defaults to the LOGFIRE_SERVICE_NAME environment variable.

None

service_version

str | None

Version of this service. Defaults to the LOGFIRE_SERVICE_VERSION environment variable, or the current git commit hash if available.

None

console

ConsoleOptions | Literal[False] | None

Whether to control terminal output. If None uses the LOGFIRE_CONSOLE_* environment variables, otherwise defaults to ConsoleOption(colors='auto', indent_spans=True, include_timestamps=True, verbose=False). If False disables console output. It can also be disabled by setting LOGFIRE_CONSOLE environment variable to false.

None

config_dir

Path | str | None

Directory that contains the pyproject.toml file for this project. If None uses the LOGFIRE_CONFIG_DIR environment variable, otherwise defaults to the current working directory.

None

data_dir

Path | str | None

Directory to store credentials, and logs. If None uses the LOGFIRE_CREDENTIALS_DIR environment variable, otherwise defaults to '.logfire'.

None

additional_span_processors

Sequence[SpanProcessor] | None

Span processors to use in addition to the default processor which exports spans to Logfire's API.

None

metrics

MetricsOptions | Literal[False] | None

Set to False to disable sending all metrics, or provide a MetricsOptions object to configure metrics, e.g. additional metric readers.

None

scrubbing

ScrubbingOptions | Literal[False] | None

Options for scrubbing sensitive data. Set to False to disable.

None

inspect_arguments

bool | None

Whether to enable f-string magic. If None uses the LOGFIRE_INSPECT_ARGUMENTS environment variable. Defaults to True if and only if the Python version is at least 3.11.

None

advanced

AdvancedOptions | None

Advanced options primarily used for testing by Logfire developers.

None

sampling

SamplingOptions | None

Sampling options. See the sampling guide.

None
Source code in logfire/_internal/config.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
def configure(  # noqa: D417
    *,
    send_to_logfire: bool | Literal['if-token-present'] | None = None,
    token: str | None = None,
    service_name: str | None = None,
    service_version: str | None = None,
    console: ConsoleOptions | Literal[False] | None = None,
    config_dir: Path | str | None = None,
    data_dir: Path | str | None = None,
    additional_span_processors: Sequence[SpanProcessor] | None = None,
    metrics: MetricsOptions | Literal[False] | None = None,
    scrubbing: ScrubbingOptions | Literal[False] | None = None,
    inspect_arguments: bool | None = None,
    sampling: SamplingOptions | None = None,
    advanced: AdvancedOptions | None = None,
    **deprecated_kwargs: Unpack[DeprecatedKwargs],
) -> None:
    """Configure the logfire SDK.

    Args:
        send_to_logfire: Whether to send logs to logfire.dev. Defaults to the `LOGFIRE_SEND_TO_LOGFIRE` environment
            variable if set, otherwise defaults to `True`. If `if-token-present` is provided, logs will only be sent if
            a token is present.
        token: The project token. Defaults to the `LOGFIRE_TOKEN` environment variable.
        service_name: Name of this service. Defaults to the `LOGFIRE_SERVICE_NAME` environment variable.
        service_version: Version of this service. Defaults to the `LOGFIRE_SERVICE_VERSION` environment variable, or the
            current git commit hash if available.
        console: Whether to control terminal output. If `None` uses the `LOGFIRE_CONSOLE_*` environment variables,
            otherwise defaults to `ConsoleOption(colors='auto', indent_spans=True, include_timestamps=True, verbose=False)`.
            If `False` disables console output. It can also be disabled by setting `LOGFIRE_CONSOLE` environment variable to `false`.
        config_dir: Directory that contains the `pyproject.toml` file for this project. If `None` uses the
            `LOGFIRE_CONFIG_DIR` environment variable, otherwise defaults to the current working directory.
        data_dir: Directory to store credentials, and logs. If `None` uses the `LOGFIRE_CREDENTIALS_DIR` environment variable, otherwise defaults to `'.logfire'`.
        additional_span_processors: Span processors to use in addition to the default processor which exports spans to Logfire's API.
        metrics: Set to `False` to disable sending all metrics,
            or provide a `MetricsOptions` object to configure metrics, e.g. additional metric readers.
        scrubbing: Options for scrubbing sensitive data. Set to `False` to disable.
        inspect_arguments: Whether to enable
            [f-string magic](https://logfire.pydantic.dev/docs/guides/onboarding-checklist/add-manual-tracing/#f-strings).
            If `None` uses the `LOGFIRE_INSPECT_ARGUMENTS` environment variable.
            Defaults to `True` if and only if the Python version is at least 3.11.
        advanced: Advanced options primarily used for testing by Logfire developers.
        sampling: Sampling options. See the [sampling guide](https://logfire.pydantic.dev/docs/guides/advanced/sampling/).
    """
    processors = deprecated_kwargs.pop('processors', None)  # type: ignore
    if processors is not None:  # pragma: no cover
        raise ValueError(
            'The `processors` argument has been replaced by `additional_span_processors`. '
            'Set `send_to_logfire=False` to disable the default processor.'
        )

    metric_readers = deprecated_kwargs.pop('metric_readers', None)  # type: ignore
    if metric_readers is not None:  # pragma: no cover
        raise ValueError(
            'The `metric_readers` argument has been replaced by '
            '`metrics=logfire.MetricsOptions(additional_readers=[...])`. '
            'Set `send_to_logfire=False` to disable the default metric reader.'
        )

    collect_system_metrics = deprecated_kwargs.pop('collect_system_metrics', None)  # type: ignore
    if collect_system_metrics is False:
        raise ValueError(
            'The `collect_system_metrics` argument has been removed. '
            'System metrics are no longer collected by default.'
        )

    if collect_system_metrics is not None:
        raise ValueError(
            'The `collect_system_metrics` argument has been removed. '
            'Use `logfire.instrument_system_metrics()` instead.'
        )

    scrubbing_callback = deprecated_kwargs.pop('scrubbing_callback', None)  # type: ignore
    scrubbing_patterns = deprecated_kwargs.pop('scrubbing_patterns', None)  # type: ignore
    if scrubbing_callback or scrubbing_patterns:
        if scrubbing is not None:
            raise ValueError(
                'Cannot specify `scrubbing` and `scrubbing_callback` or `scrubbing_patterns` at the same time. '
                'Use only `scrubbing`.'
            )
        warnings.warn(
            'The `scrubbing_callback` and `scrubbing_patterns` arguments are deprecated. '
            'Use `scrubbing=logfire.ScrubbingOptions(callback=..., extra_patterns=[...])` instead.',
        )
        scrubbing = ScrubbingOptions(callback=scrubbing_callback, extra_patterns=scrubbing_patterns)  # type: ignore

    project_name = deprecated_kwargs.pop('project_name', None)  # type: ignore
    if project_name is not None:
        warnings.warn(
            'The `project_name` argument is deprecated and not needed.',
        )

    trace_sample_rate: float | None = deprecated_kwargs.pop('trace_sample_rate', None)  # type: ignore
    if trace_sample_rate is not None:
        if sampling:
            raise ValueError(
                'Cannot specify both `trace_sample_rate` and `sampling`. '
                'Use `sampling.head` instead of `trace_sample_rate`.'
            )
        else:
            sampling = SamplingOptions(head=trace_sample_rate)
            warnings.warn(
                'The `trace_sample_rate` argument is deprecated. '
                'Use `sampling=logfire.SamplingOptions(head=...)` instead.',
            )

    show_summary = deprecated_kwargs.pop('show_summary', None)  # type: ignore
    if show_summary is not None:  # pragma: no cover
        warnings.warn(
            'The `show_summary` argument is deprecated. '
            'Use `console=False` or `console=logfire.ConsoleOptions(show_project_link=False)` instead.',
        )

    for key in ('base_url', 'id_generator', 'ns_timestamp_generator'):
        value: Any = deprecated_kwargs.pop(key, None)  # type: ignore
        if value is None:
            continue
        if advanced is not None:
            raise ValueError(f'Cannot specify `{key}` and `advanced`. Use only `advanced`.')
        # (this means that specifying two deprecated advanced kwargs at the same time will raise an error)
        advanced = AdvancedOptions(**{key: value})
        warnings.warn(
            f'The `{key}` argument is deprecated. Use `advanced=logfire.AdvancedOptions({key}=...)` instead.',
            stacklevel=2,
        )

    additional_metric_readers: Any = deprecated_kwargs.pop('additional_metric_readers', None)  # type: ignore
    if additional_metric_readers:
        if metrics is not None:
            raise ValueError(
                'Cannot specify both `additional_metric_readers` and `metrics`. '
                'Use `metrics=logfire.MetricsOptions(additional_readers=[...])` instead.'
            )
        warnings.warn(
            'The `additional_metric_readers` argument is deprecated. '
            'Use `metrics=logfire.MetricsOptions(additional_readers=[...])` instead.',
        )
        metrics = MetricsOptions(additional_readers=additional_metric_readers)

    pydantic_plugin: Any = deprecated_kwargs.pop('pydantic_plugin', None)  # type: ignore
    if pydantic_plugin is not None:
        warnings.warn(
            'The `pydantic_plugin` argument is deprecated. Use `logfire.instrument_pydantic()` instead.',
        )
        from logfire.integrations.pydantic import set_pydantic_plugin_config

        set_pydantic_plugin_config(pydantic_plugin)

    if deprecated_kwargs:
        raise TypeError(f'configure() got unexpected keyword arguments: {", ".join(deprecated_kwargs)}')

    GLOBAL_CONFIG.configure(
        send_to_logfire=send_to_logfire,
        token=token,
        service_name=service_name,
        service_version=service_version,
        console=console,
        metrics=metrics,
        config_dir=Path(config_dir) if config_dir else None,
        data_dir=Path(data_dir) if data_dir else None,
        additional_span_processors=additional_span_processors,
        scrubbing=scrubbing,
        inspect_arguments=inspect_arguments,
        sampling=sampling,
        advanced=advanced,
    )

load_spans_from_file

load_spans_from_file(
    file_path: str | Path | IO[bytes] | None,
) -> Iterator[ExportTraceServiceRequest]

Load a backup file.

Parameters:

Name Type Description Default

file_path

str | Path | IO[bytes] | None

The path to the backup file.

required

Raises:

Type Description
ValueError

If the file is not a valid backup file.

Returns:

Type Description
Iterator[ExportTraceServiceRequest]

An iterator over each ExportTraceServiceRequest message in the backup file.

Source code in logfire/_internal/exporters/file.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def load_file(file_path: str | Path | IO[bytes] | None) -> Iterator[ExportTraceServiceRequest]:
    """Load a backup file.

    Args:
        file_path: The path to the backup file.

    Raises:
        ValueError: If the file is not a valid backup file.

    Returns:
        An iterator over each `ExportTraceServiceRequest` message in the backup file.
    """
    if file_path is None:  # pragma: no cover
        file_path = Path(DEFAULT_FALLBACK_FILE_NAME)
    elif isinstance(file_path, str):  # pragma: no cover
        file_path = Path(file_path)
    with file_path.open('rb') if isinstance(file_path, Path) else file_path as f:
        parser = FileParser()
        while True:
            data = f.read(parser.get_suggested_read_size())
            if not data:
                parser.finish()
                return
            yield from parser.push(data)

suppress_instrumentation

suppress_instrumentation()

Context manager to suppress all logs/spans generated by logfire or OpenTelemetry.

Source code in logfire/_internal/utils.py
242
243
244
245
246
247
248
249
250
251
252
@contextmanager
def suppress_instrumentation():
    """Context manager to suppress all logs/spans generated by logfire or OpenTelemetry."""
    new_context = context.get_current()
    for key in SUPPRESS_INSTRUMENTATION_CONTEXT_KEYS:
        new_context = context.set_value(key, True, new_context)
    token = context.attach(new_context)
    try:
        yield
    finally:
        context.detach(token)