Skip to content

bytes

genlm.bytes

ByteBeamState

Bases: StatefulByteLM

Represents the state of the beam during byte-level language modeling.

Tracks multiple candidate states and their probabilities, pruning low-probability candidates.

Parameters:

Name Type Description Default
states list[LazyTrieState]

List of candidate states to track

required
params BeamParams

Parameters controlling beam search behavior

required
Source code in genlm/bytes/byte_lm/beam.py
class ByteBeamState(StatefulByteLM):
    """Represents the state of the beam during byte-level language modeling.

    Tracks multiple candidate states and their probabilities, pruning low-probability
    candidates.

    Args:
        states (list[LazyTrieState]): List of candidate states to track
        params (BeamParams): Parameters controlling beam search behavior
    """

    def __init__(self, states, params):
        self.states = sorted(states, key=lambda b: -b.weight)
        self.params = params

    @classmethod
    async def initial(cls, llm, params, trie_opts=None):
        """Creates initial beam state.

        Args:
            llm (StatefulTokenizedLM): Token-level language model to use.
            params (BeamParams): Beam search parameters.
            trie_opts (dict, optional): Additional keyword arguments passed to
                AsyncTokenByteTrie.from_vocab. For example, {"max_batch_size": 100}.

        Returns:
            (ByteBeamState): Initial beam state.
        """
        # Handle EOS tokens
        trie_opts = trie_opts or {}
        trie_opts["eos_tokens"] = params.eos_tokens

        async_trie = AsyncTokenByteTrie.from_vocab(
            get_byte_vocab(llm.tokenizer), **trie_opts
        )
        state = LazyTrieState.initial(llm, async_trie, mode=TrieMode.WITH_EOS)
        return cls([await state.materialize()], params)

    def __iter__(self):
        return iter(self.states)

    def __len__(self):
        return len(self.states)

    @cached_property
    def logZ(self):
        """Estimate of the partition function (sum of weights) for current beam.
        This is the estimate of the prefix probability of the bytes consumed so far.
        """
        return logsumexp([state.weight for state in self])

    async def __lshift__(self, a):
        """Advances the beam state with a new byte.

        Args:
            a (int): Byte to add to states.

        Returns:
            (ByteBeamState): New beam state after processing the byte.
        """
        new_states = []
        for state in self:
            if new_state := state << a:
                new_states.append(new_state)

        logZ = logsumexp([s.weight for s in new_states]) if new_states else -np.inf
        for state in await self.extend(logZ):
            if new_state := state << a:
                new_states.append(new_state)

        new_state = ByteBeamState(new_states, self.params)

        # If advancing would empty the beam, do adaptive healing if enabled
        if self.params.heal and len(new_state) == 0:
            healed = await self._adaptive_heal(a)
            if healed is not None:
                if self.params.verbose:
                    print("[heal] Applied adaptive token healing")
                return healed

        if self.params.verbose:
            print()
            print(new_state)

        return new_state

    async def logp_next(self):
        """Computes log probabilities for the next byte across all beam candidates.

        Returns:
            (LazyByteProbs): Log probabilities for next possible bytes.
        """
        assert len(self) > 0, "Beam is empty"

        logqs = []
        for state in self:
            logqs.append(state.logp_next.ps + state.weight)

        for state in await self.extend(self.logZ):
            logqs.append(state.logp_next.ps + state.weight)

        logqs = np.stack(logqs, axis=0)  # shape: (num_states, array_size)
        # mask EOT positions of non-extended (EOT is at index 256)
        logqs[: len(self), -2] = -np.inf
        logps = scipy_logsumexp(logqs, axis=0)

        return LazyByteProbs(logps - logsumexp(logps))

    async def extend(self, logZ):
        """Attempts to advance each candidate in the beam by a token (EOT).

        For each candididate with EOT available, this ends the current token and
        starts a new one in preparation for the next byte.

        Args:
            logZ (float): Current estimated of the partition function for pruning

        Returns:
            (list[LazyTrieState]): New candidate states after extension
        """
        extends = []
        for state in self:
            if new_state := state.extend():
                logZ = np.logaddexp(logZ, new_state.weight)
                extends.append(new_state)

        coros = []
        for state in extends:
            if state.weight - logZ > self.params.log_prune_threshold:
                coros.append(state.materialize())

        return await asyncio.gather(*coros)

    def prune(self):
        """Prunes beam to maintain beam width and probability threshold.

        Returns:
            (ByteBeamState): New state with pruned candidates.
        """
        new_states = [
            state
            for state in self
            if state.weight - self.logZ > self.params.log_prune_threshold
        ][: self.params.K]
        return ByteBeamState(new_states, self.params)

    def __repr__(self):
        desc = colors.bold % f"Z: {self.logZ}\n" + colors.bold % "Candidates:\n"
        for state in self:
            P = np.exp(state.weight - self.logZ)
            color = colors.green if P > self.params.prune_threshold else colors.red
            desc += f"({color % f'{P:.4f}'}) {repr(state)}\n"
        return desc

    def with_mode(self, mode):
        """Create a new beam state with specified trie mode.

        Args:
            mode (TrieMode): Trie mode for the new beam state

        Returns:
            (ByteBeamState): New beam state with updated mode
        """
        return ByteBeamState(
            states=[state.with_mode(mode) for state in self.states],
            params=self.params,
        )

    async def prefill(self, bs):
        """Prefill the beam on a sequence of bytes.

        During prefilling, EOS tokens are treated as normal tokens and don't cause termination.

        Args:
            bs (bytes): Byte sequence to prefill on

        Returns:
            (ByteBeamState): New beam state after prefilling
        """
        # Create no_eos beam for prefill (EOS tokens treated as normal)
        no_eos_beam = self.with_mode(TrieMode.WITHOUT_EOS)

        # Do prefill operations on no_eos beam
        for b in bs:
            no_eos_beam = await (no_eos_beam.prune() << b)

        # Return as with_eos beam (EOS tokens get special handling after prefill)
        return no_eos_beam.with_mode(TrieMode.WITH_EOS)

    async def cleanup(self):
        """Cleans up resources used by the candidates."""
        await asyncio.gather(*[state.cleanup() for state in self])

    async def _adaptive_heal(self, next_byte: int):
        """Attempt adaptive token healing using TokenHealer.

        Returns a new beam advanced by `next_byte` if healing succeeds, else None.
        """
        healer = TokenHealer(
            max_backoff=self.params.heal_max_backoff,
            max_splits=self.params.heal_max_splits,
            verbose=self.params.verbose,
        )

        for state in self.states:
            healed_state = await healer.try_heal(state, next_byte)
            if healed_state is not None:
                return ByteBeamState([healed_state], self.params)

        return None

initial(llm, params, trie_opts=None) async classmethod

Creates initial beam state.

Parameters:

Name Type Description Default
llm StatefulTokenizedLM

Token-level language model to use.

required
params BeamParams

Beam search parameters.

required
trie_opts dict

Additional keyword arguments passed to AsyncTokenByteTrie.from_vocab. For example, {"max_batch_size": 100}.

None

Returns:

Type Description
ByteBeamState

Initial beam state.

Source code in genlm/bytes/byte_lm/beam.py
@classmethod
async def initial(cls, llm, params, trie_opts=None):
    """Creates initial beam state.

    Args:
        llm (StatefulTokenizedLM): Token-level language model to use.
        params (BeamParams): Beam search parameters.
        trie_opts (dict, optional): Additional keyword arguments passed to
            AsyncTokenByteTrie.from_vocab. For example, {"max_batch_size": 100}.

    Returns:
        (ByteBeamState): Initial beam state.
    """
    # Handle EOS tokens
    trie_opts = trie_opts or {}
    trie_opts["eos_tokens"] = params.eos_tokens

    async_trie = AsyncTokenByteTrie.from_vocab(
        get_byte_vocab(llm.tokenizer), **trie_opts
    )
    state = LazyTrieState.initial(llm, async_trie, mode=TrieMode.WITH_EOS)
    return cls([await state.materialize()], params)

logZ cached property

Estimate of the partition function (sum of weights) for current beam. This is the estimate of the prefix probability of the bytes consumed so far.

__lshift__(a) async

Advances the beam state with a new byte.

Parameters:

Name Type Description Default
a int

Byte to add to states.

required

Returns:

Type Description
ByteBeamState

New beam state after processing the byte.

Source code in genlm/bytes/byte_lm/beam.py
async def __lshift__(self, a):
    """Advances the beam state with a new byte.

    Args:
        a (int): Byte to add to states.

    Returns:
        (ByteBeamState): New beam state after processing the byte.
    """
    new_states = []
    for state in self:
        if new_state := state << a:
            new_states.append(new_state)

    logZ = logsumexp([s.weight for s in new_states]) if new_states else -np.inf
    for state in await self.extend(logZ):
        if new_state := state << a:
            new_states.append(new_state)

    new_state = ByteBeamState(new_states, self.params)

    # If advancing would empty the beam, do adaptive healing if enabled
    if self.params.heal and len(new_state) == 0:
        healed = await self._adaptive_heal(a)
        if healed is not None:
            if self.params.verbose:
                print("[heal] Applied adaptive token healing")
            return healed

    if self.params.verbose:
        print()
        print(new_state)

    return new_state

logp_next() async

Computes log probabilities for the next byte across all beam candidates.

Returns:

Type Description
LazyByteProbs

Log probabilities for next possible bytes.

Source code in genlm/bytes/byte_lm/beam.py
async def logp_next(self):
    """Computes log probabilities for the next byte across all beam candidates.

    Returns:
        (LazyByteProbs): Log probabilities for next possible bytes.
    """
    assert len(self) > 0, "Beam is empty"

    logqs = []
    for state in self:
        logqs.append(state.logp_next.ps + state.weight)

    for state in await self.extend(self.logZ):
        logqs.append(state.logp_next.ps + state.weight)

    logqs = np.stack(logqs, axis=0)  # shape: (num_states, array_size)
    # mask EOT positions of non-extended (EOT is at index 256)
    logqs[: len(self), -2] = -np.inf
    logps = scipy_logsumexp(logqs, axis=0)

    return LazyByteProbs(logps - logsumexp(logps))

extend(logZ) async

Attempts to advance each candidate in the beam by a token (EOT).

For each candididate with EOT available, this ends the current token and starts a new one in preparation for the next byte.

Parameters:

Name Type Description Default
logZ float

Current estimated of the partition function for pruning

required

Returns:

Type Description
list[LazyTrieState]

New candidate states after extension

Source code in genlm/bytes/byte_lm/beam.py
async def extend(self, logZ):
    """Attempts to advance each candidate in the beam by a token (EOT).

    For each candididate with EOT available, this ends the current token and
    starts a new one in preparation for the next byte.

    Args:
        logZ (float): Current estimated of the partition function for pruning

    Returns:
        (list[LazyTrieState]): New candidate states after extension
    """
    extends = []
    for state in self:
        if new_state := state.extend():
            logZ = np.logaddexp(logZ, new_state.weight)
            extends.append(new_state)

    coros = []
    for state in extends:
        if state.weight - logZ > self.params.log_prune_threshold:
            coros.append(state.materialize())

    return await asyncio.gather(*coros)

prune()

Prunes beam to maintain beam width and probability threshold.

Returns:

Type Description
ByteBeamState

New state with pruned candidates.

Source code in genlm/bytes/byte_lm/beam.py
def prune(self):
    """Prunes beam to maintain beam width and probability threshold.

    Returns:
        (ByteBeamState): New state with pruned candidates.
    """
    new_states = [
        state
        for state in self
        if state.weight - self.logZ > self.params.log_prune_threshold
    ][: self.params.K]
    return ByteBeamState(new_states, self.params)

with_mode(mode)

Create a new beam state with specified trie mode.

Parameters:

Name Type Description Default
mode TrieMode

Trie mode for the new beam state

required

Returns:

Type Description
ByteBeamState

New beam state with updated mode

Source code in genlm/bytes/byte_lm/beam.py
def with_mode(self, mode):
    """Create a new beam state with specified trie mode.

    Args:
        mode (TrieMode): Trie mode for the new beam state

    Returns:
        (ByteBeamState): New beam state with updated mode
    """
    return ByteBeamState(
        states=[state.with_mode(mode) for state in self.states],
        params=self.params,
    )

prefill(bs) async

Prefill the beam on a sequence of bytes.

During prefilling, EOS tokens are treated as normal tokens and don't cause termination.

Parameters:

Name Type Description Default
bs bytes

Byte sequence to prefill on

required

Returns:

Type Description
ByteBeamState

New beam state after prefilling

Source code in genlm/bytes/byte_lm/beam.py
async def prefill(self, bs):
    """Prefill the beam on a sequence of bytes.

    During prefilling, EOS tokens are treated as normal tokens and don't cause termination.

    Args:
        bs (bytes): Byte sequence to prefill on

    Returns:
        (ByteBeamState): New beam state after prefilling
    """
    # Create no_eos beam for prefill (EOS tokens treated as normal)
    no_eos_beam = self.with_mode(TrieMode.WITHOUT_EOS)

    # Do prefill operations on no_eos beam
    for b in bs:
        no_eos_beam = await (no_eos_beam.prune() << b)

    # Return as with_eos beam (EOS tokens get special handling after prefill)
    return no_eos_beam.with_mode(TrieMode.WITH_EOS)

cleanup() async

Cleans up resources used by the candidates.

Source code in genlm/bytes/byte_lm/beam.py
async def cleanup(self):
    """Cleans up resources used by the candidates."""
    await asyncio.gather(*[state.cleanup() for state in self])

LazyTrieState

A lazy-evaluated state of a TokenByteTrie traversal.

This class maintains the state of a language model while traversing a trie structure, lazily evaluating probabilities and maintaining the weight of the current path through the trie for beam search.

Parameters:

Name Type Description Default
lm_state StatefulTokenizedLM

Current language model state

required
trie TokenByteTrie

Trie structure mapping tokens to byte sequences

required
node int

Current node in the trie

required
weight float

Cumulative log probability of the path to this node

required
mass ndarray

Masses for each node in the trie for the current state

None
mode TrieMode

Trie mode to use

WITH_EOS
terminated bool

Whether the state is terminated (EOS has been consumed)

False
Source code in genlm/bytes/byte_lm/trie_state.py
class LazyTrieState:
    """A lazy-evaluated state of a TokenByteTrie traversal.

    This class maintains the state of a language model while traversing a trie structure,
    lazily evaluating probabilities and maintaining the weight of the current path through the trie
    for beam search.

    Args:
        lm_state (StatefulTokenizedLM): Current language model state
        trie (TokenByteTrie): Trie structure mapping tokens to byte sequences
        node (int): Current node in the trie
        weight (float): Cumulative log probability of the path to this node
        mass (numpy.ndarray, optional): Masses for each node in the trie for the current state
        mode (TrieMode): Trie mode to use
        terminated (bool): Whether the state is terminated (EOS has been consumed)
    """

    def __init__(
        self,
        lm_state,
        trie,
        node,
        weight,
        mass=None,
        mode=TrieMode.WITH_EOS,
        terminated=False,
    ):
        self.lm_state = lm_state
        self.trie = trie
        self.node = node
        self.weight = weight
        self._mass = mass
        self._extend = None
        self.mode = mode
        self.root = self.trie.trie.root
        self.children = self.trie.trie.children
        self.terminated = terminated

    @classmethod
    def initial(cls, lm, trie, mode=TrieMode.WITH_EOS):
        """Creates an initial trie state.

        Args:
            lm (genlm.backend.AsyncLM): Language model to use
            trie (TokenByteTrie): TokenByteTrie structure for byte-to-token mapping
            mode (TrieMode): Trie mode to use

        Returns:
            (LazyTrieState): Initial state at root of trie with weight 0.0
        """
        return cls(
            trie=trie,
            node=trie.trie.root,
            lm_state=StatefulTokenizedLM.initial(lm),
            weight=0.0,
            mode=mode,
        )

    @property
    def partial(self):
        """Returns the byte sequence corresponding to the current node in the trie."""
        return self.trie.trie.node2prefix[self.node]

    @property
    def mass(self):
        """Returns the log mass for each node in the trie.

        The mass at a node corresponds to the sum of the probabilities of all
        tokens which share the prefix (`self.partial`) represented by that node.

        Raises:
            ValueError: If state hasn't been materialized yet
        """
        if self._mass is None:
            raise ValueError("State is not yet materialized.")
        return self._mass

    def with_mode(self, mode):
        """Returns a new state with the given mode."""
        return LazyTrieState(
            lm_state=self.lm_state,
            trie=self.trie,
            node=self.node,
            weight=self.weight,
            mass=self._mass,
            mode=mode,
            terminated=self.terminated,
        )

    def actions(self):
        """Returns possible byte transitions from current node."""
        return self.children[self.node]

    def get_EOT(self):
        """Returns the end-of-token node if available from current position in the trie."""
        return self.children[self.node].get(self.trie.trie.eot_token)

    def __lshift__(self, b):
        """Transitions to a new state by consuming a byte.

        Args:
            b (int): Byte to consume

        Returns:
            (LazyTrieState|None): New state after consuming byte, or None if transition invalid (terminated or EOS)
        """
        if self.terminated:
            return None

        if node := self.children[self.node].get(b):
            mass = self.mass
            return LazyTrieState(
                lm_state=self.lm_state,
                trie=self.trie,
                mass=mass,
                node=node,
                weight=self.weight + mass[node] - mass[self.node],
                mode=self.mode,
                terminated=b == EOS,
            )

    def extend(self):
        """Extends current state by consuming an end-of-token if possible.

        Returns:
            (LazyTrieState|None): New state after consuming EOT, or None if not possible
        """
        if self._extend is None:
            if (eot_node := self.get_EOT()) is not None:
                mass = self.mass
                self._extend = LazyTrieState(
                    lm_state=self.lm_state
                    << int(self.trie.trie.leaf2token_id[eot_node]),
                    trie=self.trie,
                    node=self.root,
                    weight=self.weight + mass[eot_node] - mass[self.node],
                    mode=self.mode,
                )
        return self._extend

    @cached_property
    def logp_next(self):
        """Computes log probabilities for next possible transitions.

        Returns:
            (LazyByteProbs): Lazy log probability distribution over possible next bytes
        """
        logps = np.full(258, -np.inf)  # 258 for EOT, EOS + 256 for normal bytes
        mass = self.mass
        logZ = mass[self.node]

        for byte, node in self.actions().items():
            logps[byte if byte is not None else 256] = mass[node] - logZ

        return LazyByteProbs(logps)

    async def materialize(self):
        """Materializes the masses for each node in the trie for the current state.

        This makes a call to the language model and the underlying trie.

        Returns:
            (LazyTrieState): Self with materialized masses
        """
        if self._mass is None:
            logp_next = await self.lm_state.logp_next()
            log_mass = await self.trie.weight_sum(torch.exp(logp_next), self.mode)
            mass = torch.log(log_mass)
            self._mass = mass.cpu().numpy()
        return self

    def __repr__(self):
        context = colors.green % ("|" + escape(bytes(self.partial)))
        if self.terminated:
            context += colors.yellow % "<EOS>"
        return f"{self.weight:.2f}: {self.lm_state}" + context

    async def cleanup(self):
        """Cleans up resources used by the trie."""
        await self.trie.cleanup()

initial(lm, trie, mode=TrieMode.WITH_EOS) classmethod

Creates an initial trie state.

Parameters:

Name Type Description Default
lm AsyncLM

Language model to use

required
trie TokenByteTrie

TokenByteTrie structure for byte-to-token mapping

required
mode TrieMode

Trie mode to use

WITH_EOS

Returns:

Type Description
LazyTrieState

Initial state at root of trie with weight 0.0

Source code in genlm/bytes/byte_lm/trie_state.py
@classmethod
def initial(cls, lm, trie, mode=TrieMode.WITH_EOS):
    """Creates an initial trie state.

    Args:
        lm (genlm.backend.AsyncLM): Language model to use
        trie (TokenByteTrie): TokenByteTrie structure for byte-to-token mapping
        mode (TrieMode): Trie mode to use

    Returns:
        (LazyTrieState): Initial state at root of trie with weight 0.0
    """
    return cls(
        trie=trie,
        node=trie.trie.root,
        lm_state=StatefulTokenizedLM.initial(lm),
        weight=0.0,
        mode=mode,
    )

partial property

Returns the byte sequence corresponding to the current node in the trie.

mass property

Returns the log mass for each node in the trie.

The mass at a node corresponds to the sum of the probabilities of all tokens which share the prefix (self.partial) represented by that node.

Raises:

Type Description
ValueError

If state hasn't been materialized yet

with_mode(mode)

Returns a new state with the given mode.

Source code in genlm/bytes/byte_lm/trie_state.py
def with_mode(self, mode):
    """Returns a new state with the given mode."""
    return LazyTrieState(
        lm_state=self.lm_state,
        trie=self.trie,
        node=self.node,
        weight=self.weight,
        mass=self._mass,
        mode=mode,
        terminated=self.terminated,
    )

actions()

Returns possible byte transitions from current node.

Source code in genlm/bytes/byte_lm/trie_state.py
def actions(self):
    """Returns possible byte transitions from current node."""
    return self.children[self.node]

get_EOT()

Returns the end-of-token node if available from current position in the trie.

Source code in genlm/bytes/byte_lm/trie_state.py
def get_EOT(self):
    """Returns the end-of-token node if available from current position in the trie."""
    return self.children[self.node].get(self.trie.trie.eot_token)

__lshift__(b)

Transitions to a new state by consuming a byte.

Parameters:

Name Type Description Default
b int

Byte to consume

required

Returns:

Type Description
LazyTrieState | None

New state after consuming byte, or None if transition invalid (terminated or EOS)

Source code in genlm/bytes/byte_lm/trie_state.py
def __lshift__(self, b):
    """Transitions to a new state by consuming a byte.

    Args:
        b (int): Byte to consume

    Returns:
        (LazyTrieState|None): New state after consuming byte, or None if transition invalid (terminated or EOS)
    """
    if self.terminated:
        return None

    if node := self.children[self.node].get(b):
        mass = self.mass
        return LazyTrieState(
            lm_state=self.lm_state,
            trie=self.trie,
            mass=mass,
            node=node,
            weight=self.weight + mass[node] - mass[self.node],
            mode=self.mode,
            terminated=b == EOS,
        )

extend()

Extends current state by consuming an end-of-token if possible.

Returns:

Type Description
LazyTrieState | None

New state after consuming EOT, or None if not possible

Source code in genlm/bytes/byte_lm/trie_state.py
def extend(self):
    """Extends current state by consuming an end-of-token if possible.

    Returns:
        (LazyTrieState|None): New state after consuming EOT, or None if not possible
    """
    if self._extend is None:
        if (eot_node := self.get_EOT()) is not None:
            mass = self.mass
            self._extend = LazyTrieState(
                lm_state=self.lm_state
                << int(self.trie.trie.leaf2token_id[eot_node]),
                trie=self.trie,
                node=self.root,
                weight=self.weight + mass[eot_node] - mass[self.node],
                mode=self.mode,
            )
    return self._extend

logp_next cached property

Computes log probabilities for next possible transitions.

Returns:

Type Description
LazyByteProbs

Lazy log probability distribution over possible next bytes

materialize() async

Materializes the masses for each node in the trie for the current state.

This makes a call to the language model and the underlying trie.

Returns:

Type Description
LazyTrieState

Self with materialized masses

Source code in genlm/bytes/byte_lm/trie_state.py
async def materialize(self):
    """Materializes the masses for each node in the trie for the current state.

    This makes a call to the language model and the underlying trie.

    Returns:
        (LazyTrieState): Self with materialized masses
    """
    if self._mass is None:
        logp_next = await self.lm_state.logp_next()
        log_mass = await self.trie.weight_sum(torch.exp(logp_next), self.mode)
        mass = torch.log(log_mass)
        self._mass = mass.cpu().numpy()
    return self

cleanup() async

Cleans up resources used by the trie.

Source code in genlm/bytes/byte_lm/trie_state.py
async def cleanup(self):
    """Cleans up resources used by the trie."""
    await self.trie.cleanup()

StatefulTokenizedLM

A stateful tokenized language model that maintains context and generates next token logprobs.

Parameters:

Name Type Description Default
model AsyncLM

The underlying language model

required
context list

List of token IDs representing the current context

required
n_calls int

Number of times the model has been called

0
max_context_length int

Maximum length of context to maintain

None
Source code in genlm/bytes/byte_lm/lm_state.py
class StatefulTokenizedLM:
    """A stateful tokenized language model that maintains context and generates next token logprobs.

    Args:
        model (genlm.backend.AsyncLM): The underlying language model
        context (list): List of token IDs representing the current context
        n_calls (int): Number of times the model has been called
        max_context_length (int, optional): Maximum length of context to maintain
    """

    def __init__(self, model, context, n_calls=0, max_context_length=None):
        self.model = model
        self.context = context
        self._n_calls = n_calls
        self.max_context_length = max_context_length

    @classmethod
    def initial(cls, model, initial_context=None, max_context_length=None):
        """Creates an initial state for the language model.

        Args:
            model (genlm.backend.AsyncLM): The language model to use
            initial_context (list, optional): Initial context of token IDs. Defaults to [tokenizer.bos_token_id]
            max_context_length (int, optional): Maximum context length to maintain

        Returns:
            (StatefulTokenizedLM): A new instance with initial state
        """
        if initial_context is None:
            initial_context = [model.tokenizer.bos_token_id]
        return cls(model, initial_context, max_context_length=max_context_length)

    def __lshift__(self, token):
        """Adds a new token to the context and returns a new state.

        Args:
            token (int): Token ID to add to context

        Returns:
            (StatefulTokenizedLM): New state with updated context
        """
        assert isinstance(token, int)
        if (
            self.max_context_length is not None
            and len(self.context) >= self.max_context_length
        ):
            self.context = self.context[-(self.max_context_length - 1) :]
        return StatefulTokenizedLM(
            self.model, self.context + [token], n_calls=self._n_calls
        )

    async def logp_next(self):
        """Computes log probabilities for the next token given the current context.

        Returns:
            (torch.Tensor): Log probabilities for next tokens
        """
        self._n_calls += 1
        return await self.model.next_token_logprobs(self.context)

    def __repr__(self):
        return colors.purple % (
            "|".join([escape(self.model.byte_vocab[x]) for x in self.context])
        )

initial(model, initial_context=None, max_context_length=None) classmethod

Creates an initial state for the language model.

Parameters:

Name Type Description Default
model AsyncLM

The language model to use

required
initial_context list

Initial context of token IDs. Defaults to [tokenizer.bos_token_id]

None
max_context_length int

Maximum context length to maintain

None

Returns:

Type Description
StatefulTokenizedLM

A new instance with initial state

Source code in genlm/bytes/byte_lm/lm_state.py
@classmethod
def initial(cls, model, initial_context=None, max_context_length=None):
    """Creates an initial state for the language model.

    Args:
        model (genlm.backend.AsyncLM): The language model to use
        initial_context (list, optional): Initial context of token IDs. Defaults to [tokenizer.bos_token_id]
        max_context_length (int, optional): Maximum context length to maintain

    Returns:
        (StatefulTokenizedLM): A new instance with initial state
    """
    if initial_context is None:
        initial_context = [model.tokenizer.bos_token_id]
    return cls(model, initial_context, max_context_length=max_context_length)

__lshift__(token)

Adds a new token to the context and returns a new state.

Parameters:

Name Type Description Default
token int

Token ID to add to context

required

Returns:

Type Description
StatefulTokenizedLM

New state with updated context

Source code in genlm/bytes/byte_lm/lm_state.py
def __lshift__(self, token):
    """Adds a new token to the context and returns a new state.

    Args:
        token (int): Token ID to add to context

    Returns:
        (StatefulTokenizedLM): New state with updated context
    """
    assert isinstance(token, int)
    if (
        self.max_context_length is not None
        and len(self.context) >= self.max_context_length
    ):
        self.context = self.context[-(self.max_context_length - 1) :]
    return StatefulTokenizedLM(
        self.model, self.context + [token], n_calls=self._n_calls
    )

logp_next() async

Computes log probabilities for the next token given the current context.

Returns:

Type Description
Tensor

Log probabilities for next tokens

Source code in genlm/bytes/byte_lm/lm_state.py
async def logp_next(self):
    """Computes log probabilities for the next token given the current context.

    Returns:
        (torch.Tensor): Log probabilities for next tokens
    """
    self._n_calls += 1
    return await self.model.next_token_logprobs(self.context)

BeamParams dataclass

Parameters for byte-level beam summing algorithm.

Parameters:

Name Type Description Default
K int

Beam width - maximum number of candidates to maintain.

required
prune_threshold float

Probability threshold for pruning candidates. Candidates with probability below this are removed. Defaults to 0.0

0.0
verbose bool

Whether to print the beam state at each step. Defaults to False

False
eos_tokens list[bytes]

List of tokens that should be treated as EOS. When configured, EOS tokens will terminate generation when sampled. Defaults to None

None
heal bool

Whether to enable adaptive token healing. Defaults to True

True
heal_max_backoff int

Maximum number of bytes to back off when healing. Defaults to None

None
heal_max_splits int

Maximum number of intra-suffix commits allowed during a single healing attempt. Defaults to None

None
Source code in genlm/bytes/byte_lm/beam.py
@dataclass
class BeamParams:
    """Parameters for byte-level beam summing algorithm.

    Args:
        K (int): Beam width - maximum number of candidates to maintain.
        prune_threshold (float, optional): Probability threshold for pruning candidates.
            Candidates with probability below this are removed. Defaults to 0.0
        verbose (bool, optional): Whether to print the beam state at each step. Defaults to False
        eos_tokens (list[bytes], optional): List of tokens that should be treated as EOS. When configured,
            EOS tokens will terminate generation when sampled. Defaults to None
        heal (bool, optional): Whether to enable adaptive token healing. Defaults to True
        heal_max_backoff (int, optional): Maximum number of bytes to back off when healing. Defaults to None
        heal_max_splits (int, optional): Maximum number of intra-suffix commits allowed during a single healing attempt. Defaults to None
    """

    K: int
    prune_threshold: float = 0.0
    verbose: bool = False
    eos_tokens: list[bytes] = None
    heal: bool = True
    heal_max_backoff: int | None = None
    # Optional cap on how many intra-partial commits are allowed during a
    # single healing attempt. None means unlimited. Set to 0 to disable
    # multi-split behavior (i.e., single-split only).
    heal_max_splits: int | None = None

    def __post_init__(self):
        if self.prune_threshold < 0:
            raise ValueError(
                f"prune_threshold must be non-negative, got {self.prune_threshold}"
            )
        self.log_prune_threshold = (
            np.log(self.prune_threshold) if self.prune_threshold > 0 else -np.inf
        )
        self.eos_tokens = set(self.eos_tokens) if self.eos_tokens else set()

TokenByteTrie

A trie data structure for efficient token-to-byte mapping.

Source code in genlm/bytes/trie.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
class TokenByteTrie:
    """A trie data structure for efficient token-to-byte mapping."""

    def __init__(
        self,
        decode,
        device=None,
        atomic_tokens=None,
        eot_token=None,
        eos_tokens=None,
        max_batch_size=64,
    ):
        """Initialize a `TokenByteTrie`.

        Args:
            decode (list[bytes]): List representing the token vocabulary.
            device (str, optional): Device to use for weight sum and max computations ('cpu' or 'cuda').
            atomic_tokens (list[bytes], optional): List of tokens that should be treated as atomic units rather than being split into bytes.
            eot_token (bytes|None, optional): End-of-token token. Default is None, which represents EOT as None.
            eos_tokens (set[bytes], optional): Set of tokens that should be treated as EOS (End of Sequence).
            max_batch_size (int, optional): Maximum batch size for weight sum sparse matrix multiplication.
        """
        self.decode = decode
        self.max_batch_size = max_batch_size

        self.device = device or ("cuda" if torch.cuda.is_available() else "cpu")
        if self.device not in ["cpu", "cuda"]:
            raise ValueError(f"Invalid device: {device}. Must be 'cpu', 'cuda' or None")

        self.eot_token = eot_token
        self.eos_tokens = set(eos_tokens or [])
        self.eos_token_ids = [
            i for i, token in enumerate(decode) if token in self.eos_tokens
        ]

        self._build_trie(atomic_tokens or [])
        self._renumber()
        self._build_node2prefix()
        self._build_reachability_matrix()
        self.token_ids = torch.tensor(
            self.token_id_to_leaf[:, 0], dtype=torch.long, device=self.device
        )

    def _build_trie(self, atomic_tokens):
        """Builds a trie data structure from the vocabulary.

        Returns:
            (dict): A dictionary where keys are token IDs and values are lists of characters.
        """
        for token in atomic_tokens:
            if token not in self.decode:
                raise ValueError(f"Atomic token {token} not in vocabulary")

        for token in self.eos_tokens:
            if token not in self.decode:
                raise ValueError(f"EOS token {token} not in vocabulary")

        self.word2leaf = {}
        self.children = [{}]  # First node is root
        self.root = 0
        self.token_id_to_leaf = []
        self.lookup = {}

        for token_id, word in enumerate(self.decode):
            if word in self.lookup:
                raise ValueError(f"Duplicate word in vocabulary: {word}")
            self.lookup[word] = token_id

            # Build ALL tokens in trie (including EOS tokens for conditioning mode)
            curr = self.root
            letters = [word] if word in atomic_tokens else word
            for letter in letters:
                if letter not in self.children[curr]:
                    self.children[curr][letter] = len(self.children)
                    self.children.append({})
                curr = self.children[curr][letter]

            self.children[curr][self.eot_token] = last = len(self.children)
            self.children.append({})
            assert word not in self.word2leaf
            self.word2leaf[word] = last
            self.token_id_to_leaf.append((token_id, last))

        self.eos_node = len(self.children)
        self.children.append({})  # Create the EOS node
        self.children[self.root][EOS] = self.eos_node

        self.leaf2word = dict(zip(self.word2leaf.values(), self.word2leaf.keys()))
        self.jump = [
            np.array(sorted(x.values()), dtype=np.int32) for x in self.children
        ]

    def _renumber(self):
        """Renumber the states of the trie so that they are named by a contiguous
        range of integers and those integers respect the topological ordering
        of the trie. This improves the efficiency of the updating the trie as
        it improves memory locality.
        """
        self.ordering = np.array(list(self._order(self.root)), np.int32)
        ordering = {}
        for i, x in enumerate(self._order_full(self.root)):
            ordering[x] = i
        self._rename(f=lambda x: ordering[x])

    def _order(self, node):
        """Generate a topological ordering of nodes beneath the given node.

        Args:
            node (int): Starting node index

        Yields:
            int: Node indices in topological order
        """
        for a in self.children[node]:
            if a is not None:
                yield from self._order(self.children[node][a])
        yield node

    def _order_full(self, node):
        """Generate a complete topological ordering including all child nodes.

        Args:
            node (int): Starting node index

        Yields:
            (int): Node indices in complete topological order
        """
        for a in self.children[node]:
            yield from self._order_full(self.children[node][a])
        yield node

    def _rename(self, f):
        """Rename all node indices in the trie using the provided mapping function.

        Args:
            f (callable): Function that maps old node indices to new node indices
        """
        N = len(self.children)

        new_children = [{} for _ in range(N)]
        nodes = range(N)

        for x in nodes:
            for letter, y in self.children[x].items():
                new_children[f(x)][letter] = f(y)

        self.root = f(self.root)
        self.children = new_children
        self.word2leaf = {w: f(x) for w, x in self.word2leaf.items()}
        self.leaf2word = dict(zip(self.word2leaf.values(), self.word2leaf.keys()))

        self.token_id_to_leaf = np.array(
            [(i, f(x)) for i, x in self.token_id_to_leaf], dtype=np.int32
        )
        self.leaf2token_id = dict(
            zip(self.token_id_to_leaf[:, 1], self.token_id_to_leaf[:, 0])
        )

        self.ordering = np.array([f(x) for x in self.ordering])
        self.jump = [np.array(sorted(x.values()), dtype=np.int32) for x in new_children]

        # Update EOS node after renumbering
        self.eos_node = f(self.eos_node)

    def _build_node2prefix(self):
        """Builds a mapping from each node to its prefix.

        Returns:
            (dict): A dictionary where keys are node IDs and values are lists of characters.
        """
        node2prefix = {self.root: []}
        for x in reversed(range(len(self.children))):
            for letter, y in self.children[x].items():
                if letter is None:
                    node2prefix[y] = node2prefix[x]
                elif isinstance(letter, bytes):
                    node2prefix[y] = node2prefix[x] + list(letter)
                else:
                    node2prefix[y] = node2prefix[x] + [letter]

        self.node2prefix = node2prefix

    def _build_parent_map(self):
        """Builds a mapping from each node to its parent node in the trie.

        Returns:
            (dict): A dictionary where keys are child nodes and values are their parent nodes.
        """
        parent = {}
        for node in range(len(self.children)):
            for child in self.jump[node]:
                parent[child] = node
        return parent

    def _build_reachability_matrix(self):
        """Constructs dual sparse reachability matrices for efficient weight propagation.

        The matrix M is constructed such that M[i,j] = 1 if node j is either:
        - The leaf node i itself (self-connection)
        - An ancestor of leaf node i in the trie

        For propagate_eos mode, EOS tokens contribute directly to eos_node and root.
        """
        leaf_indices = self.token_id_to_leaf[:, 1]
        parent = self._build_parent_map()
        # Build no_eos matrix (includes all tokens, doesn't map any tokens to the eos_node)
        rows_no_eos, cols_no_eos = [], []
        # Build with_eos matrix (maps EOS tokens to the eos_node only)
        rows_with_eos, cols_with_eos = [], []

        for i, node in enumerate(leaf_indices):
            token_id = self.token_id_to_leaf[i, 0]
            token = self.decode[token_id]

            # self-connection
            rows_no_eos.append(i)
            cols_no_eos.append(node)
            if token not in self.eos_tokens:
                rows_with_eos.append(i)
                cols_with_eos.append(node)
            else:
                # EOS tokens: contribute directly to eos_node and root
                rows_with_eos.append(i)
                cols_with_eos.append(self.eos_node)
                rows_with_eos.append(i)
                cols_with_eos.append(self.root)

            current = node
            while current in parent:
                ancestor = parent[current]
                rows_no_eos.append(i)
                cols_no_eos.append(ancestor)
                if token not in self.eos_tokens:
                    rows_with_eos.append(i)
                    cols_with_eos.append(ancestor)
                current = ancestor

        # Build without_eos matrix
        indices_no_eos = torch.tensor(
            [rows_no_eos, cols_no_eos], dtype=torch.long, device=self.device
        )
        values_no_eos = torch.ones(len(rows_no_eos), device=self.device)
        self.M_no_eos = torch.sparse_coo_tensor(
            indices_no_eos, values_no_eos, (len(leaf_indices), len(self.children))
        ).to_sparse_csr()

        # Build with_eos matrix
        indices_with_eos = torch.tensor(
            [rows_with_eos, cols_with_eos], dtype=torch.long, device=self.device
        )
        values_with_eos = torch.ones(len(rows_with_eos), device=self.device)
        self.M_with_eos = torch.sparse_coo_tensor(
            indices_with_eos, values_with_eos, (len(leaf_indices), len(self.children))
        ).to_sparse_csr()

        # Keep the old matrix for backward compatibility
        self.M = self.M_no_eos
        self.src_indices = torch.tensor(
            rows_no_eos, dtype=torch.long, device=self.device
        )
        self.dst_indices = torch.tensor(
            cols_no_eos, dtype=torch.long, device=self.device
        )

    def _preprocess_ws(self, batch_ws):
        """Preprocess weight sums for batch processing.

        Args:
            batch_ws (list|np.ndarray|torch.Tensor): List of weight sum tensors or lists of weight sums.

        Returns:
            (torch.Tensor): Stacked weight sum tensor.
        """
        processed_batch_ws = []
        for ws in batch_ws:
            if not isinstance(ws, torch.Tensor):
                ws = torch.tensor(ws, device=self.device, dtype=torch.float32)
            elif ws.device != self.device or ws.dtype != torch.float32:
                ws = ws.to(device=self.device, dtype=torch.float32)
            assert ws.shape[0] == len(self.decode), [ws.shape[0], len(self.decode)]
            processed_batch_ws.append(ws)
        return torch.stack(processed_batch_ws)

    def weight_sum(self, ws, mode=None):
        """Computes the sum of weights of all leaf nodes (tokens) that are descendants of each node in the trie.

        Args:
            ws (torch.Tensor): Token weights, shape (`len(self.decode)`,).
            mode (TrieMode, optional): Trie mode - determines matrix selection.
                                     If None, defaults to WITHOUT_EOS.

        Returns:
            (numpy.ndarray): Summed weights for each node in the trie, shape (num_nodes,).
        """
        mode = mode or TrieMode.WITHOUT_EOS
        return self.batch_weight_sum(self._preprocess_ws([ws]), mode=mode)[0]

    def batch_weight_sum(self, ws, mode=None):
        """Batch version of `weight_sum`.

        Args:
            ws (torch.Tensor): Batch of token weights, shape (batch_size × `len(self.decode)`).
            mode (TrieMode, optional): Trie mode - determines matrix selection.
                                     If None, defaults to WITHOUT_EOS.

        Returns:
            (numpy.ndarray): Summed weights for each node in the trie, shape (batch_size × num_nodes).
        """
        mode = mode or TrieMode.WITHOUT_EOS

        ws = self._preprocess_ws(ws)
        batch_size = ws.shape[0]
        all_masses = []

        # Choose matrix based on mode
        matrix = self.M_with_eos if mode == TrieMode.WITH_EOS else self.M_no_eos

        # If you are getting illegal memory access errors here,
        # try reducing the max_batch_size.
        for i in range(0, batch_size, self.max_batch_size):
            batch_ws = ws[i : i + self.max_batch_size]
            masses = torch.sparse.mm(batch_ws[:, self.token_ids], matrix)
            all_masses.append(masses)
        return torch.cat(all_masses, dim=0)

    def weight_max(self, ws):
        """Computes the maximum weight of all descendant leaf nodes (tokens) for each node in the trie.

        Args:
            ws (torch.Tensor): Token weights, shape (`len(self.decode)`,).

        Returns:
            (numpy.ndarray): Maximum weights for each node in the trie, shape (num_nodes,).
        """
        return self.batch_weight_max(self._preprocess_ws([ws]))[0]

    def batch_weight_max(self, ws):
        """Batch version of `weight_max`.

        Args:
            ws (torch.Tensor): Batch of token weights, shape (batch_size × `len(self.decode)`).

        Returns:
            (numpy.ndarray): Maximum weights for each node in the trie, shape (batch_size × num_nodes).
        """
        ws = self._preprocess_ws(ws)

        # Get leaf weights
        leaf_weights = ws[:, self.token_ids]  # shape: (batch_size × num_leafs)
        batch_size = leaf_weights.shape[0]

        # Use scatter_reduce to propagate maximum values in parallel
        result = torch.zeros((batch_size, len(self.children)), device=self.device)
        result.scatter_reduce_(
            dim=1,
            index=self.dst_indices.expand(batch_size, -1),
            src=leaf_weights[:, self.src_indices],
            reduce="amax",
            include_self=False,
        )

        return result

    def visualize(self, ws=None):
        """Visualize the trie structure using Graphviz.

        Args:
            ws (np.ndarray|None): Optional weight vector to display at each node. Should be of length `len(self.children)`.

        Returns:
            (graphviz.Digraph): The generated graph object
        """
        try:
            import graphviz
        except ImportError:  # pragma: no cover
            raise ImportError(
                "Please install graphviz: pip install graphviz"
            )  # pragma: no cover

        if ws is not None and len(ws) != len(self.children):
            raise ValueError(
                f"Weight vector length ({len(ws)}) must match number of nodes ({len(self.children)})"
            )

        dot = graphviz.Digraph(comment="Token Character Trie")
        dot.attr(rankdir="LR")

        # Create a subgraph for the legend
        with dot.subgraph(name="cluster_legend") as legend:
            legend.attr(label="Legend", fontsize="10")
            legend.attr("node", fontsize="7", width="0.1", height="0.1")

            # Example internal node
            legend.node(
                "legend_internal",
                "Internal Node ID\n'Prefix'\nWeight (if provided)",
                shape="circle",
            )

            # Example leaf node
            legend.node("legend_leaf", "Complete Token", shape="doublecircle")

            legend.edge(
                "legend_internal",
                "legend_leaf",
                label="Token item",
                fontsize="10",
            )

            # Align legend horizontally
            legend.attr(rankdir="TB")
            legend.attr(rank="same")

        # Add the main trie nodes and edges
        for node_id in range(len(self.children)):
            prefix = self.node2prefix[node_id]

            if ws is not None:
                label = f"{node_id}\n'{prefix}'\n{ws[node_id]:.4f}"
            else:
                label = f"{node_id}\n'{prefix}'"

            # Color nodes based on mass if provided
            if ws is not None:
                max_ws = ws.max()
                if max_ws > 0:
                    intensity = int(255 * (1 - ws[node_id] / max_ws))
                    color = f"#{intensity:02x}{255:02x}{intensity:02x}"
                else:
                    color = "#ffffff"  # white for zero mass
            else:
                color = "#ffffff"  # default white

            if node_id in self.leaf2word:
                dot.node(
                    str(node_id),
                    label,
                    shape="doublecircle",
                    style="filled",
                    fillcolor=color,
                )
            else:
                dot.node(
                    str(node_id), label, shape="circle", style="filled", fillcolor=color
                )

        for node_id, children in enumerate(self.children):
            for char, child_id in children.items():
                if char is not None:
                    edge_label = str(char)
                else:
                    edge_label = "End-of-Token"

                dot.edge(str(node_id), str(child_id), label=edge_label)

        return dot

__init__(decode, device=None, atomic_tokens=None, eot_token=None, eos_tokens=None, max_batch_size=64)

Initialize a TokenByteTrie.

Parameters:

Name Type Description Default
decode list[bytes]

List representing the token vocabulary.

required
device str

Device to use for weight sum and max computations ('cpu' or 'cuda').

None
atomic_tokens list[bytes]

List of tokens that should be treated as atomic units rather than being split into bytes.

None
eot_token bytes | None

End-of-token token. Default is None, which represents EOT as None.

None
eos_tokens set[bytes]

Set of tokens that should be treated as EOS (End of Sequence).

None
max_batch_size int

Maximum batch size for weight sum sparse matrix multiplication.

64
Source code in genlm/bytes/trie.py
def __init__(
    self,
    decode,
    device=None,
    atomic_tokens=None,
    eot_token=None,
    eos_tokens=None,
    max_batch_size=64,
):
    """Initialize a `TokenByteTrie`.

    Args:
        decode (list[bytes]): List representing the token vocabulary.
        device (str, optional): Device to use for weight sum and max computations ('cpu' or 'cuda').
        atomic_tokens (list[bytes], optional): List of tokens that should be treated as atomic units rather than being split into bytes.
        eot_token (bytes|None, optional): End-of-token token. Default is None, which represents EOT as None.
        eos_tokens (set[bytes], optional): Set of tokens that should be treated as EOS (End of Sequence).
        max_batch_size (int, optional): Maximum batch size for weight sum sparse matrix multiplication.
    """
    self.decode = decode
    self.max_batch_size = max_batch_size

    self.device = device or ("cuda" if torch.cuda.is_available() else "cpu")
    if self.device not in ["cpu", "cuda"]:
        raise ValueError(f"Invalid device: {device}. Must be 'cpu', 'cuda' or None")

    self.eot_token = eot_token
    self.eos_tokens = set(eos_tokens or [])
    self.eos_token_ids = [
        i for i, token in enumerate(decode) if token in self.eos_tokens
    ]

    self._build_trie(atomic_tokens or [])
    self._renumber()
    self._build_node2prefix()
    self._build_reachability_matrix()
    self.token_ids = torch.tensor(
        self.token_id_to_leaf[:, 0], dtype=torch.long, device=self.device
    )

weight_sum(ws, mode=None)

Computes the sum of weights of all leaf nodes (tokens) that are descendants of each node in the trie.

Parameters:

Name Type Description Default
ws Tensor

Token weights, shape (len(self.decode),).

required
mode TrieMode

Trie mode - determines matrix selection. If None, defaults to WITHOUT_EOS.

None

Returns:

Type Description
ndarray

Summed weights for each node in the trie, shape (num_nodes,).

Source code in genlm/bytes/trie.py
def weight_sum(self, ws, mode=None):
    """Computes the sum of weights of all leaf nodes (tokens) that are descendants of each node in the trie.

    Args:
        ws (torch.Tensor): Token weights, shape (`len(self.decode)`,).
        mode (TrieMode, optional): Trie mode - determines matrix selection.
                                 If None, defaults to WITHOUT_EOS.

    Returns:
        (numpy.ndarray): Summed weights for each node in the trie, shape (num_nodes,).
    """
    mode = mode or TrieMode.WITHOUT_EOS
    return self.batch_weight_sum(self._preprocess_ws([ws]), mode=mode)[0]

batch_weight_sum(ws, mode=None)

Batch version of weight_sum.

Parameters:

Name Type Description Default
ws Tensor

Batch of token weights, shape (batch_size × len(self.decode)).

required
mode TrieMode

Trie mode - determines matrix selection. If None, defaults to WITHOUT_EOS.

None

Returns:

Type Description
ndarray

Summed weights for each node in the trie, shape (batch_size × num_nodes).

Source code in genlm/bytes/trie.py
def batch_weight_sum(self, ws, mode=None):
    """Batch version of `weight_sum`.

    Args:
        ws (torch.Tensor): Batch of token weights, shape (batch_size × `len(self.decode)`).
        mode (TrieMode, optional): Trie mode - determines matrix selection.
                                 If None, defaults to WITHOUT_EOS.

    Returns:
        (numpy.ndarray): Summed weights for each node in the trie, shape (batch_size × num_nodes).
    """
    mode = mode or TrieMode.WITHOUT_EOS

    ws = self._preprocess_ws(ws)
    batch_size = ws.shape[0]
    all_masses = []

    # Choose matrix based on mode
    matrix = self.M_with_eos if mode == TrieMode.WITH_EOS else self.M_no_eos

    # If you are getting illegal memory access errors here,
    # try reducing the max_batch_size.
    for i in range(0, batch_size, self.max_batch_size):
        batch_ws = ws[i : i + self.max_batch_size]
        masses = torch.sparse.mm(batch_ws[:, self.token_ids], matrix)
        all_masses.append(masses)
    return torch.cat(all_masses, dim=0)

weight_max(ws)

Computes the maximum weight of all descendant leaf nodes (tokens) for each node in the trie.

Parameters:

Name Type Description Default
ws Tensor

Token weights, shape (len(self.decode),).

required

Returns:

Type Description
ndarray

Maximum weights for each node in the trie, shape (num_nodes,).

Source code in genlm/bytes/trie.py
def weight_max(self, ws):
    """Computes the maximum weight of all descendant leaf nodes (tokens) for each node in the trie.

    Args:
        ws (torch.Tensor): Token weights, shape (`len(self.decode)`,).

    Returns:
        (numpy.ndarray): Maximum weights for each node in the trie, shape (num_nodes,).
    """
    return self.batch_weight_max(self._preprocess_ws([ws]))[0]

batch_weight_max(ws)

Batch version of weight_max.

Parameters:

Name Type Description Default
ws Tensor

Batch of token weights, shape (batch_size × len(self.decode)).

required

Returns:

Type Description
ndarray

Maximum weights for each node in the trie, shape (batch_size × num_nodes).

Source code in genlm/bytes/trie.py
def batch_weight_max(self, ws):
    """Batch version of `weight_max`.

    Args:
        ws (torch.Tensor): Batch of token weights, shape (batch_size × `len(self.decode)`).

    Returns:
        (numpy.ndarray): Maximum weights for each node in the trie, shape (batch_size × num_nodes).
    """
    ws = self._preprocess_ws(ws)

    # Get leaf weights
    leaf_weights = ws[:, self.token_ids]  # shape: (batch_size × num_leafs)
    batch_size = leaf_weights.shape[0]

    # Use scatter_reduce to propagate maximum values in parallel
    result = torch.zeros((batch_size, len(self.children)), device=self.device)
    result.scatter_reduce_(
        dim=1,
        index=self.dst_indices.expand(batch_size, -1),
        src=leaf_weights[:, self.src_indices],
        reduce="amax",
        include_self=False,
    )

    return result

visualize(ws=None)

Visualize the trie structure using Graphviz.

Parameters:

Name Type Description Default
ws ndarray | None

Optional weight vector to display at each node. Should be of length len(self.children).

None

Returns:

Type Description
Digraph

The generated graph object

Source code in genlm/bytes/trie.py
def visualize(self, ws=None):
    """Visualize the trie structure using Graphviz.

    Args:
        ws (np.ndarray|None): Optional weight vector to display at each node. Should be of length `len(self.children)`.

    Returns:
        (graphviz.Digraph): The generated graph object
    """
    try:
        import graphviz
    except ImportError:  # pragma: no cover
        raise ImportError(
            "Please install graphviz: pip install graphviz"
        )  # pragma: no cover

    if ws is not None and len(ws) != len(self.children):
        raise ValueError(
            f"Weight vector length ({len(ws)}) must match number of nodes ({len(self.children)})"
        )

    dot = graphviz.Digraph(comment="Token Character Trie")
    dot.attr(rankdir="LR")

    # Create a subgraph for the legend
    with dot.subgraph(name="cluster_legend") as legend:
        legend.attr(label="Legend", fontsize="10")
        legend.attr("node", fontsize="7", width="0.1", height="0.1")

        # Example internal node
        legend.node(
            "legend_internal",
            "Internal Node ID\n'Prefix'\nWeight (if provided)",
            shape="circle",
        )

        # Example leaf node
        legend.node("legend_leaf", "Complete Token", shape="doublecircle")

        legend.edge(
            "legend_internal",
            "legend_leaf",
            label="Token item",
            fontsize="10",
        )

        # Align legend horizontally
        legend.attr(rankdir="TB")
        legend.attr(rank="same")

    # Add the main trie nodes and edges
    for node_id in range(len(self.children)):
        prefix = self.node2prefix[node_id]

        if ws is not None:
            label = f"{node_id}\n'{prefix}'\n{ws[node_id]:.4f}"
        else:
            label = f"{node_id}\n'{prefix}'"

        # Color nodes based on mass if provided
        if ws is not None:
            max_ws = ws.max()
            if max_ws > 0:
                intensity = int(255 * (1 - ws[node_id] / max_ws))
                color = f"#{intensity:02x}{255:02x}{intensity:02x}"
            else:
                color = "#ffffff"  # white for zero mass
        else:
            color = "#ffffff"  # default white

        if node_id in self.leaf2word:
            dot.node(
                str(node_id),
                label,
                shape="doublecircle",
                style="filled",
                fillcolor=color,
            )
        else:
            dot.node(
                str(node_id), label, shape="circle", style="filled", fillcolor=color
            )

    for node_id, children in enumerate(self.children):
        for char, child_id in children.items():
            if char is not None:
                edge_label = str(char)
            else:
                edge_label = "End-of-Token"

            dot.edge(str(node_id), str(child_id), label=edge_label)

    return dot

AsyncTokenByteTrie

An asynchronous wrapper for TokenByteTrie implementations that provides automatic request batching.

Source code in genlm/bytes/trie.py
class AsyncTokenByteTrie:
    """An asynchronous wrapper for TokenByteTrie implementations that provides automatic request batching."""

    def __init__(self, trie):
        """Initialize an `AsyncTokenByteTrie`.

        Args:
            trie (TokenByteTrie): The underlying `TokenByteTrie` instance
        """
        self.trie = trie
        self._queue = None
        self._task = None

    @classmethod
    def from_vocab(cls, vocab, **kwargs):
        """Creates an `AsyncTokenByteTrie` from a vocabulary.

        Args:
            vocab (list): The vocabulary over which the trie will be defined.
            **kwargs (dict): Additional arguments passed to the trie constructor.
                             Can include 'eos_tokens' for EOS support.

        Returns:
            (AsyncTokenByteTrie): The initialized asynchronous trie instance.
        """
        trie = TokenByteTrie(decode=vocab, **kwargs)
        return cls(trie)

    def _queue_request(self, ws, mode, op):
        if not self._task or self._task.done():
            self.start()

        future = asyncio.get_running_loop().create_future()
        self._queue.put_nowait(((ws, mode), future, op))
        return future

    async def weight_sum(self, ws, mode=None):
        """Queue a `weight_sum` request. Multiple concurrent calls will be automatically batched
        together by (operation, mode) pairs.

        Args:
            ws (torch.Tensor): Token weights, shape (`len(self.trie.decode)`,).
            mode (TrieMode, optional): Trie mode determining EOS handling. Defaults to WITHOUT_EOS.

        Returns:
            (np.ndarray): The calculated mass sums for the given distribution.
        """
        mode = mode or TrieMode.WITHOUT_EOS
        return await self._queue_request(ws, mode, TrieOp.SUM)

    async def weight_max(self, ws):
        """Queue a `weight_max` request. Multiple concurrent calls will be automatically batched
        together.

        Args:
            ws (torch.Tensor): Token weights, shape (`len(self.trie.decode)`,).

        Returns:
            (np.ndarray): The calculated max weights for the given distribution.
        """
        # For MAX, mode doesn't matter so use WITHOUT_EOS as default
        return await self._queue_request(ws, TrieMode.WITHOUT_EOS, TrieOp.MAX)

    def start(self):
        """Start the background processing task if not already running."""
        if not self._task or self._task.done():
            logger.debug("starting background loop")
            # Create a new queue so that it is bound to the current event loop
            self._queue = asyncio.Queue()
            self._task = asyncio.create_task(self._background_loop())

    async def _background_loop(self):
        """Background task that processes queued weight sum and max requests.

        Continuously monitors the queue for new requests and processes them in batches
        grouped by (operation, mode) pairs using the underlying trie implementation.

        Raises:
            (Exception): If any error occurs during processing, it is propagated to all
                         pending futures in the current batch.
        """
        while True:
            try:
                # Group by (operation, mode) pairs for efficient batching
                op_mode_groups = defaultdict(list)

                (ws, mode), future, op = await self._queue.get()
                op_mode_groups[(op, mode)].append(((ws, mode), future))

                try:
                    while True:
                        (ws, mode), future, op = self._queue.get_nowait()
                        op_mode_groups[(op, mode)].append(((ws, mode), future))
                except asyncio.QueueEmpty:
                    pass

                for (op, mode), group in op_mode_groups.items():
                    requests, futures = zip(*group)
                    # Extract just the ws tensors from the (ws, mode) tuples
                    ws_list = [req[0] for req in requests]

                    if op == TrieOp.SUM:
                        if logger.isEnabledFor(logging.DEBUG):
                            logger.debug(
                                f"processing {len(ws_list)} sum requests with mode {mode}"
                            )  # pragma: no cover
                        results = self.trie.batch_weight_sum(ws_list, mode=mode)
                    elif op == TrieOp.MAX:
                        if logger.isEnabledFor(logging.DEBUG):
                            logger.debug(
                                f"processing {len(ws_list)} max requests"
                            )  # pragma: no cover
                        # MAX operations don't need mode, so use the original batch_weight_max
                        results = self.trie.batch_weight_max(ws_list)
                    else:
                        raise ValueError(f"Unknown trie operation: {op}")

                    for future, result in zip(futures, results):
                        future.set_result(result)

            except Exception as e:
                for group in op_mode_groups.values():
                    for _, future in group:
                        if not future.done():
                            future.set_exception(e)
                raise

    async def cleanup(self):
        """Async cleanup - preferred method"""
        if self._task and not self._task.done():
            self._task.cancel()
            try:
                await self._task
            except asyncio.CancelledError:
                pass
            self._task = None

    def shutdown(self):
        """Stop the background processing task and cleanup resources."""
        if self._task is not None:
            try:
                self._task.cancel()
            except RuntimeError:  # pragma: no cover
                # Ignore runtime errors that might occur if event loop is closed
                pass
            self._task = None

    def __del__(self):
        self.shutdown()

__init__(trie)

Initialize an AsyncTokenByteTrie.

Parameters:

Name Type Description Default
trie TokenByteTrie

The underlying TokenByteTrie instance

required
Source code in genlm/bytes/trie.py
def __init__(self, trie):
    """Initialize an `AsyncTokenByteTrie`.

    Args:
        trie (TokenByteTrie): The underlying `TokenByteTrie` instance
    """
    self.trie = trie
    self._queue = None
    self._task = None

from_vocab(vocab, **kwargs) classmethod

Creates an AsyncTokenByteTrie from a vocabulary.

Parameters:

Name Type Description Default
vocab list

The vocabulary over which the trie will be defined.

required
**kwargs dict

Additional arguments passed to the trie constructor. Can include 'eos_tokens' for EOS support.

{}

Returns:

Type Description
AsyncTokenByteTrie

The initialized asynchronous trie instance.

Source code in genlm/bytes/trie.py
@classmethod
def from_vocab(cls, vocab, **kwargs):
    """Creates an `AsyncTokenByteTrie` from a vocabulary.

    Args:
        vocab (list): The vocabulary over which the trie will be defined.
        **kwargs (dict): Additional arguments passed to the trie constructor.
                         Can include 'eos_tokens' for EOS support.

    Returns:
        (AsyncTokenByteTrie): The initialized asynchronous trie instance.
    """
    trie = TokenByteTrie(decode=vocab, **kwargs)
    return cls(trie)

weight_sum(ws, mode=None) async

Queue a weight_sum request. Multiple concurrent calls will be automatically batched together by (operation, mode) pairs.

Parameters:

Name Type Description Default
ws Tensor

Token weights, shape (len(self.trie.decode),).

required
mode TrieMode

Trie mode determining EOS handling. Defaults to WITHOUT_EOS.

None

Returns:

Type Description
ndarray

The calculated mass sums for the given distribution.

Source code in genlm/bytes/trie.py
async def weight_sum(self, ws, mode=None):
    """Queue a `weight_sum` request. Multiple concurrent calls will be automatically batched
    together by (operation, mode) pairs.

    Args:
        ws (torch.Tensor): Token weights, shape (`len(self.trie.decode)`,).
        mode (TrieMode, optional): Trie mode determining EOS handling. Defaults to WITHOUT_EOS.

    Returns:
        (np.ndarray): The calculated mass sums for the given distribution.
    """
    mode = mode or TrieMode.WITHOUT_EOS
    return await self._queue_request(ws, mode, TrieOp.SUM)

weight_max(ws) async

Queue a weight_max request. Multiple concurrent calls will be automatically batched together.

Parameters:

Name Type Description Default
ws Tensor

Token weights, shape (len(self.trie.decode),).

required

Returns:

Type Description
ndarray

The calculated max weights for the given distribution.

Source code in genlm/bytes/trie.py
async def weight_max(self, ws):
    """Queue a `weight_max` request. Multiple concurrent calls will be automatically batched
    together.

    Args:
        ws (torch.Tensor): Token weights, shape (`len(self.trie.decode)`,).

    Returns:
        (np.ndarray): The calculated max weights for the given distribution.
    """
    # For MAX, mode doesn't matter so use WITHOUT_EOS as default
    return await self._queue_request(ws, TrieMode.WITHOUT_EOS, TrieOp.MAX)

start()

Start the background processing task if not already running.

Source code in genlm/bytes/trie.py
def start(self):
    """Start the background processing task if not already running."""
    if not self._task or self._task.done():
        logger.debug("starting background loop")
        # Create a new queue so that it is bound to the current event loop
        self._queue = asyncio.Queue()
        self._task = asyncio.create_task(self._background_loop())

cleanup() async

Async cleanup - preferred method

Source code in genlm/bytes/trie.py
async def cleanup(self):
    """Async cleanup - preferred method"""
    if self._task and not self._task.done():
        self._task.cancel()
        try:
            await self._task
        except asyncio.CancelledError:
            pass
        self._task = None

shutdown()

Stop the background processing task and cleanup resources.

Source code in genlm/bytes/trie.py
def shutdown(self):
    """Stop the background processing task and cleanup resources."""
    if self._task is not None:
        try:
            self._task.cancel()
        except RuntimeError:  # pragma: no cover
            # Ignore runtime errors that might occur if event loop is closed
            pass
        self._task = None

Chart

Bases: dict

A specialized dictionary for managing probability distributions.

Extends dict with operations useful for probability distributions and numeric computations, including arithmetic operations, normalization, and visualization.

Parameters:

Name Type Description Default
zero Any

Default value for missing keys

required
vals tuple

Initial (key, value) pairs

()
Source code in genlm/bytes/util.py
class Chart(dict):
    """A specialized dictionary for managing probability distributions.

    Extends dict with operations useful for probability distributions and numeric computations,
    including arithmetic operations, normalization, and visualization.

    Args:
        zero (Any): Default value for missing keys
        vals (tuple, optional): Initial (key, value) pairs
    """

    def __init__(self, zero, vals=()):
        self.zero = zero
        super().__init__(vals)

    def __missing__(self, k):
        return self.zero

    def spawn(self):
        return Chart(self.zero)

    def __add__(self, other):
        new = self.spawn()
        for k, v in self.items():
            new[k] += v
        for k, v in other.items():
            new[k] += v
        return new

    def __mul__(self, other):
        new = self.spawn()
        for k in self:
            v = self[k] * other[k]
            if v == self.zero:
                continue
            new[k] += v
        return new

    def copy(self):
        return Chart(self.zero, self)

    def trim(self):
        return Chart(self.zero, {k: v for k, v in self.items() if v != self.zero})

    def metric(self, other):
        assert isinstance(other, Chart)
        err = 0
        for x in self.keys() | other.keys():
            err = max(err, abs(self[x] - other[x]))
        return err

    def _repr_html_(self):
        return (
            '<div style="font-family: Monospace;">'
            + format_table(self.trim().items(), headings=["key", "value"])
            + "</div>"
        )

    def __repr__(self):
        return repr({k: v for k, v in self.items() if v != self.zero})

    def __str__(self, style_value=lambda k, v: str(v)):
        def key(k):
            return -self[k]

        return (
            "Chart {\n"
            + "\n".join(
                f"  {k!r}: {style_value(k, self[k])},"
                for k in sorted(self, key=key)
                if self[k] != self.zero
            )
            + "\n}"
        )

    def assert_equal(self, want, *, domain=None, tol=1e-5, verbose=False, throw=True):
        if not isinstance(want, Chart):
            want = Chart(self.zero, want)
        if domain is None:
            domain = self.keys() | want.keys()
        assert verbose or throw
        errors = []
        for x in domain:
            if abs(self[x] - want[x]) <= tol:
                if verbose:
                    print(colors.mark(True), x, self[x])
            else:
                if verbose:
                    print(colors.mark(False), x, self[x], want[x])
                errors.append(x)
        if throw:
            for x in errors:
                raise AssertionError(f"{x}: {self[x]} {want[x]}")

    def argmax(self):
        return max(self, key=self.__getitem__)

    def argmin(self):
        return min(self, key=self.__getitem__)

    def top(self, k):
        return Chart(
            self.zero,
            {k: self[k] for k in sorted(self, key=self.__getitem__, reverse=True)[:k]},
        )

    def max(self):
        return max(self.values())

    def min(self):
        return min(self.values())

    def sum(self):
        return sum(self.values())

    def sort(self, **kwargs):
        return Chart(self.zero, [(k, self[k]) for k in sorted(self, **kwargs)])

    def sort_descending(self):
        return Chart(
            self.zero, [(k, self[k]) for k in sorted(self, key=lambda k: -self[k])]
        )

    def normalize(self):
        Z = self.sum()
        if Z == 0:
            return self
        return Chart(self.zero, [(k, v / Z) for k, v in self.items()])

    def filter(self, f):
        return Chart(self.zero, [(k, v) for k, v in self.items() if f(k)])

    def map_values(self, f):
        return Chart(f(self.zero), [(k, f(v)) for k, v in self.items()])

    def map_keys(self, f):
        return Chart(self.zero, [(f(k), v) for k, v in self.items()])

    def project(self, f):
        "Apply the function `f` to each key; summing when f-transformed keys overlap."
        out = self.spawn()
        for k, v in self.items():
            out[f(k)] += v
        return out

    # TODO: the more general version of this method is join
    def compare(self, other, *, domain=None):
        if not isinstance(other, Chart):
            other = Chart(self.zero, other)
        if domain is None:
            domain = self.keys() | other.keys()
        rows = []
        for x in domain:
            m = abs(self[x] - other[x])
            rows.append(dict(key=x, self=self[x], other=other[x], metric=m))
        return pd.DataFrame(rows)

    def to_dict(self):
        return {k: v for k, v in self.items()}

project(f)

Apply the function f to each key; summing when f-transformed keys overlap.

Source code in genlm/bytes/util.py
def project(self, f):
    "Apply the function `f` to each key; summing when f-transformed keys overlap."
    out = self.spawn()
    for k, v in self.items():
        out[f(k)] += v
    return out