Skip to content

Parsers

Parsers are responsible for two distinct stages:

  • turning raw log lines into structured records
  • turning structured message text into templates

This page is the reference for both the built-in parser implementations and the protocols they satisfy.

>>> from anomalog.parsers import BGLParser, Drain3Parser, IdentityTemplateParser
>>> BGLParser.name
'bgl'
>>> Drain3Parser.name
'drain3'
>>> IdentityTemplateParser("demo").inference("node 7 failed")
('node 7 failed', [])

anomalog.parsers

Public parser package.

BGLParser dataclass

Bases: StructuredParser

Parse Blue Gene/L log lines into structured fields with anomaly flag.

parse_line(raw_line)

Parse a single BGL line; return None for unparseable lines.

Parameters:

Name Type Description Default
raw_line str

Raw BGL log line to parse.

required

Examples:

>>> sample = (
...     "- 1117838570 2005.06.03 R02-M1-N0-C:J12-U11 "
...     "2005-06-03-15.42.50.363779 R02-M1-N0-C:J12-U11 "
...     "RAS KERNEL INFO cache parity corrected"
... )
>>> parsed = BGLParser().parse_line(sample)
>>> (parsed.entity_id, parsed.anomalous)  # dash prefix => normal
('R02-M1-N0-C:J12-U11', 0)

Returns:

Type Description
BaseStructuredLine | None

BaseStructuredLine | None: Parsed structured record, or None when the line does not match the expected format.

Drain3Parser

Bases: TemplateParser

Drain3-based template miner with Prefect asset caching.

Instances accept an optional dataset name plus explicit config and cache paths so trained state can be persisted per dataset.

Attributes:

Name Type Description
name ClassVar[str]

Registry name for the built-in Drain3 parser.

Parameters:

Name Type Description Default
dataset_name str | None

Optional dataset name used to scope persisted Drain3 state.

None
config_file Path | None

Optional Drain3 config file override.

None
cache_path Path | None

Optional explicit cache directory override.

None

cache_file_path property

Return the resolved cache file path for this parser instance.

Raises:

Type Description
ValueError

If the parser has not been bound to a dataset yet.

resolved_cache_path property

Return the on-disk cache directory for this parser instance.

Raises:

Type Description
ValueError

If the parser has not been bound to a dataset yet.

inference(unstructured_text)

Return template and parameters for a single unstructured log line.

Parameters:

Name Type Description Default
unstructured_text UntemplatedText

Raw untemplated log line to match against the trained miner.

required

Returns:

Type Description
tuple[LogTemplate, ExtractedParameters]

tuple[LogTemplate, ExtractedParameters]: Matched template and extracted parameter values.

Raises:

Type Description
ValueError

If the parser has not been trained yet.

train(untemplated_text_iterator)

Train Drain3 on an iterator of untemplated log lines.

HDFSV1Parser dataclass

Bases: StructuredParser

Parse HDFS v1 log lines into structured fields.

parse_line(raw_line)

Parse a single HDFS v1 line; return None for unparseable lines.

Parameters:

Name Type Description Default
raw_line str

Raw HDFS log line to parse.

required

Examples:

>>> line = (
...     "081109 203518 143 INFO dfs.DataNode$DataXceiver: "
...     "Receiving block blk_-160 src: /10.0.0.1:54106 "
...     "dest: /10.0.0.2:50010"
... )
>>> parsed = HDFSV1Parser().parse_line(line)
>>> parsed.entity_id, parsed.anomalous, parsed.untemplated_message_text[:13]
('blk_-160', None, 'INFO dfs.Data')

Returns:

Type Description
BaseStructuredLine | None

BaseStructuredLine | None: Parsed structured record, or None when the line does not match the expected format.

IdentityTemplateParser dataclass

Bases: TemplateParser

No-op template parser that returns the input string as its template.

inference(unstructured_text)

Return the raw text as the template with no parameters.

Parameters:

Name Type Description Default
unstructured_text UntemplatedText

Raw log text to treat as its own template.

required

Examples:

>>> IdentityTemplateParser("demo").inference("hello")
('hello', [])

Returns:

Type Description
tuple[LogTemplate, ExtractedParameters]

tuple[LogTemplate, ExtractedParameters]: Raw text and an empty parameter list.

train(untemplated_text_iterator)

No-op training for identity parser.

ParquetStructuredSink dataclass

Bases: StructuredSink

StructuredSink backed by partitioned Parquet datasets.

Provides efficient iteration, windowing helpers, and label-aware counts for downstream anomaly workflows.

count_entities_by_label(label_for_group)

Return counts of normal and total distinct entity ids.

Parameters:

Name Type Description Default
label_for_group Callable[[str], int | None]

Lookup that maps each entity id to its anomaly label.

required

Returns:

Name Type Description
EntityLabelCounts EntityLabelCounts

Normal and total distinct entity counts.

count_rows()

Return total number of structured rows.

Returns:

Name Type Description
int int

Total number of structured rows.

iter_entity_sequences()

Yield sequences grouped by entity bucket preserving input order.

Returns:

Type Description
Callable[[], Iterator[Collection[StructuredLine]]]

Callable[[], Iterator[Collection[StructuredLine]]]: Callable producing entity-grouped windows of structured rows.

iter_fixed_window_sequences(window_size, step_size=None)

Yield sequences of fixed window size over ordered rows.

Parameters:

Name Type Description Default
window_size int

Number of rows in each emitted window.

required
step_size int | None

Optional step between successive windows. Defaults to window_size.

None

Returns:

Type Description
Callable[[], Iterator[Collection[StructuredLine]]]

Callable[[], Iterator[Collection[StructuredLine]]]: Callable producing fixed-size row windows.

iter_structured_lines(columns=None, *, filter_expr=None, batch_size=None)

Iterate over StructuredLine objects with optional column projection.

Parameters:

Name Type Description Default
columns Sequence[str] | None

Optional projected column names to load from parquet.

None
filter_expr Expression | None

Optional PyArrow dataset filter expression.

None
batch_size int | None

Optional scanner batch size override.

None

Returns:

Type Description
Callable[[], Iterator[StructuredLine]]

Callable[[], Iterator[StructuredLine]]: Callable producing projected structured rows from the parquet dataset.

iter_time_window_sequences(time_span_ms, step_span_ms=None)

Yield sequences grouped by sliding time windows.

Parameters:

Name Type Description Default
time_span_ms int

Width of each window in milliseconds.

required
step_span_ms int | None

Optional step between successive windows. Defaults to time_span_ms.

None

Returns:

Type Description
Callable[[], Iterator[Collection[StructuredLine]]]

Callable[[], Iterator[Collection[StructuredLine]]]: Callable producing time-window grouped rows.

load_inline_label_cache()

Load sparse inline labels directly from parquet batches.

Returns:

Type Description
tuple[dict[int, int], dict[str, int]]

tuple[dict[int, int], dict[str, int]]: Sparse per-line and per-group anomaly labels.

structured_data_cache(dataset_name)

Return the cache directory for this dataset.

Parameters:

Name Type Description Default
dataset_name str

Dataset name whose parquet cache should be used.

required

Returns:

Name Type Description
Path Path

Structured-parquet cache directory for the dataset.

timestamp_bounds()

Return min and max timestamps present in the dataset.

Returns:

Type Description
tuple[int | None, int | None]

tuple[int | None, int | None]: Minimum and maximum timestamps, if any.

write_structured_lines(_workers=None)

Parse raw logs and persist structured lines to Parquet.

Parameters:

Name Type Description Default
_workers int | None

Reserved worker-count override. Currently unused by this sink implementation.

None

Returns:

Name Type Description
bool bool

Whether any anomalous rows were observed during parsing.