Skip to content

API Reference

This page documents all public functions and objects in papa2. Each module's docstrings are rendered directly from the source.


papa2.dada — Core Denoising

The main denoising module. Provides the high-level dada() and learn_errors() functions as well as the global options dictionary.

DADA_OPTS

papa2.DADA_OPTS

Global dictionary of algorithmic parameters used by dada(). Modify with set_dada_opt() or pass keyword arguments directly to dada().

Parameter Default Description
OMEGA_A 1e-40 Significance threshold for accepting new ASVs
OMEGA_P 1e-4 Significance threshold for prior-guided detection
OMEGA_C 1e-40 Significance threshold for combining ASVs
DETECT_SINGLETONS False Detect singleton ASVs
USE_KMERS True Use k-mer screen before alignment
KDIST_CUTOFF 0.42 K-mer distance cutoff
MAX_CONSIST 10 Maximum self-consistency iterations
MATCH 5 NW match score
MISMATCH -4 NW mismatch penalty
GAP_PENALTY -8 NW gap penalty
BAND_SIZE 16 Banded alignment width
VECTORIZED_ALIGNMENT True Use vectorized (SSE) alignment
MAX_CLUST 0 Maximum clusters (0 = unlimited)
MIN_FOLD 1 Minimum fold-abundance for parent
MIN_HAMMING 1 Minimum Hamming distance from parent
MIN_ABUNDANCE 1 Minimum read abundance to consider
USE_QUALS True Incorporate quality scores
HOMOPOLYMER_GAP_PENALTY None Override gap penalty in homopolymer runs
SSE 2 SSE level (0=off, 1=SSE2, 2=SSE4.1)
GAPLESS True Prefer gapless alignments
GREEDY True Use greedy clustering

dada

dada(derep, err=None, error_estimation_function=None, self_consist=False, verbose=True, **opts)

Run DADA2 denoising on one or more dereplicated samples.

Parameters:

Name Type Description Default
derep

dict from derep_fastq(), or list of dicts, or FASTQ filepath(s)

required
err

numpy array (16, ncol) error matrix, or None for self-consistent learning

None
error_estimation_function

callable(trans) -> err_matrix, default loess_errfun

None
self_consist

bool, iterate until error model converges

False
verbose

bool

True

Returns:

Type Description

dict (single sample) or list of dicts, each with: denoised: dict {seq: abundance} cluster_seqs, cluster_abunds, trans, map, pval, err_in, err_out

Environment variables

DADA2_WORKERS: number of parallel workers (0 = auto-detect, default) OMP_NUM_THREADS: set to 1 before importing for best multi-sample performance

Source code in papa2/dada.py
def dada(derep, err=None, error_estimation_function=None, self_consist=False,
         verbose=True, **opts):
    """Run DADA2 denoising on one or more dereplicated samples.

    Args:
        derep: dict from derep_fastq(), or list of dicts, or FASTQ filepath(s)
        err: numpy array (16, ncol) error matrix, or None for self-consistent learning
        error_estimation_function: callable(trans) -> err_matrix, default loess_errfun
        self_consist: bool, iterate until error model converges
        verbose: bool

    Returns:
        dict (single sample) or list of dicts, each with:
            denoised: dict {seq: abundance}
            cluster_seqs, cluster_abunds, trans, map, pval, err_in, err_out

    Environment variables:
        DADA2_WORKERS: number of parallel workers (0 = auto-detect, default)
        OMP_NUM_THREADS: set to 1 before importing for best multi-sample performance
    """
    if error_estimation_function is None:
        error_estimation_function = loess_errfun

    file_inputs = None

    # Normalize input to list of derep dicts
    if isinstance(derep, str):
        file_inputs = [derep]
        derep = [derep_fastq(derep, verbose=verbose)]
    elif isinstance(derep, dict):
        derep = [derep]
    elif isinstance(derep, list) and len(derep) > 0 and isinstance(derep[0], str):
        file_inputs = list(derep)
        derep = [derep_fastq(f, verbose=verbose) for f in derep] if self_consist or err is None else derep

    single = len(file_inputs) == 1 if file_inputs is not None else len(derep) == 1

    # Merge options
    o = dict(DADA_OPTS)
    o.update(opts)

    # Initialize error matrix (matching R: all 1.0)
    initialize_err = False
    if self_consist and err is None:
        max_q = 0
        for d in derep:
            if d["quals"].size > 0 and not np.all(np.isnan(d["quals"])):
                max_q = max(max_q, int(np.nanmax(d["quals"])) + 1)
        err = get_initial_err(max(41, max_q))
        initialize_err = True

    if err is None:
        raise ValueError("Error matrix (err) must be provided unless self_consist=True")

    err_history = []
    nconsist = 0 if initialize_err else 1  # R: init at 0, otherwise start at 1

    # CPU-only standalone work must use processes, not threads:
    # the shared library is not thread-safe across concurrent run_dada
    # invocations.
    n_workers = int(os.environ.get("DADA2_WORKERS", "0"))
    if n_workers == 0:
        cores = os.cpu_count() or 1
        n_workers = min(len(derep), cores)
    use_parallel = len(derep) > 1 and n_workers > 1

    if file_inputs is not None and not self_consist:
        max_clust_iter = o["MAX_CLUST"]
        if use_parallel:
            work_args = [(fpath, err, o, max_clust_iter, verbose) for fpath in file_inputs]
            with ProcessPoolExecutor(max_workers=n_workers) as pool:
                results = list(pool.map(_run_one_file_sample, work_args))
        else:
            results = []
            for fpath in file_inputs:
                results.append(_run_one_file_sample((fpath, err, o, max_clust_iter, verbose)))

        for res in results:
            res["err_in"] = err_history[0] if err_history else None
            res["err_out"] = err

        if single:
            return results[0]
        return results

    while True:
        if verbose and self_consist:
            sys.stdout.write(f"   selfConsist step {nconsist}")
            sys.stdout.flush()

        if nconsist > 0:
            err_history.append(err.copy())

        # R uses MAX_CLUST=1 on the initialization pass (nconsist==1 after init)
        max_clust_iter = 1 if initialize_err else o["MAX_CLUST"]

        # Extend error matrix once for all samples
        max_q_all = max((int(np.nanmax(d["quals"])) + 1 if d["quals"].size and not np.all(np.isnan(d["quals"])) else 0) for d in derep)
        erri = err.copy()
        if max_q_all > erri.shape[1]:
            extra = np.tile(erri[:, -1:], (1, max_q_all - erri.shape[1]))
            erri = np.hstack([erri, extra])

        if use_parallel:
            # Parallel multi-sample execution.
            work_args = [(drp, erri, o, max_clust_iter, verbose) for drp in derep]
            with ProcessPoolExecutor(max_workers=n_workers) as pool:
                results = list(pool.map(_run_one_sample, work_args))
            if verbose and self_consist:
                sys.stdout.write("." * len(derep))
                sys.stdout.flush()
        else:
            # Sequential path for single-sample or single-worker execution.
            results = []
            for drp in derep:
                if verbose and self_consist:
                    sys.stdout.write(".")
                    sys.stdout.flush()
                res = _run_one_sample((drp, erri, o, max_clust_iter, verbose))
                results.append(res)

        trans_list = [r["trans"] for r in results]

        if verbose and self_consist:
            print()

        # Accumulate transitions
        cur_trans = _accumulate_trans(trans_list)

        # Estimate new error rates
        err = error_estimation_function(cur_trans)

        # Check convergence
        if not self_consist:
            break

        # After initialization pass: set self-transitions to 1.0 (matching R)
        if initialize_err:
            err[0, :] = 1.0   # A2A
            err[5, :] = 1.0   # C2C
            err[10, :] = 1.0  # G2G
            err[15, :] = 1.0  # T2T
            initialize_err = False

        converged = any(np.array_equal(err, h) for h in err_history)
        if converged:
            if verbose:
                print(f"Convergence after {nconsist} rounds.")
            break

        if nconsist >= o["MAX_CONSIST"]:
            if verbose:
                print(f"Self-consistency loop terminated before convergence.")
            break

        nconsist += 1

    # Attach error info to results
    for res in results:
        res["err_in"] = list(err_history) if self_consist else err_history[0]
        res["err_out"] = err

    if single:
        return results[0]
    return results

learn_errors

learn_errors(fastq_files, nbases=100000000.0, error_estimation_function=None, verbose=True, **opts)

Learn error rates from FASTQ files.

Parameters:

Name Type Description Default
fastq_files

list of FASTQ file paths

required
nbases

target number of bases to use for learning

100000000.0
error_estimation_function

callable, default loess_errfun

None
verbose

bool

True

Returns:

Type Description

numpy array (16, ncol) of learned error rates

Source code in papa2/dada.py
def learn_errors(fastq_files, nbases=1e8, error_estimation_function=None,
                 verbose=True, **opts):
    """Learn error rates from FASTQ files.

    Args:
        fastq_files: list of FASTQ file paths
        nbases: target number of bases to use for learning
        error_estimation_function: callable, default loess_errfun
        verbose: bool

    Returns:
        numpy array (16, ncol) of learned error rates
    """
    if isinstance(fastq_files, str):
        fastq_files = [fastq_files]

    # Accumulate samples until we have enough bases
    dereps = []
    total_bases = 0
    for fl in fastq_files:
        drp = derep_fastq(fl, verbose=verbose)
        dereps.append(drp)
        n_reads = drp["abundances"].sum()
        seqlen = len(drp["seqs"][0]) if drp["seqs"] else 0
        total_bases += n_reads * seqlen
        if total_bases >= nbases:
            break

    if verbose:
        n_reads_total = sum(d["abundances"].sum() for d in dereps)
        print(f"{int(total_bases)} total bases in {n_reads_total} reads "
              f"from {len(dereps)} samples will be used for learning the error rates.")
        print("Initializing error rates to maximum possible estimate.")

    # R's learnErrors wrapper forces OMEGA_C=0 during learning without
    # changing the general dada() default.
    opts = dict(opts)
    opts.setdefault("OMEGA_C", 0)

    # Run dada with self-consistency
    results = dada(dereps, err=None, error_estimation_function=error_estimation_function,
                   self_consist=True, verbose=verbose, **opts)

    if isinstance(results, dict):
        results = [results]

    return results[0]["err_out"]

set_dada_opt

set_dada_opt(**kwargs)

Source code in papa2/dada.py
def set_dada_opt(**kwargs):
    for k, v in kwargs.items():
        if k in DADA_OPTS:
            DADA_OPTS[k] = v
        else:
            raise KeyError(f"Unknown DADA option: {k}")

get_dada_opt

get_dada_opt(key=None)

Source code in papa2/dada.py
def get_dada_opt(key=None):
    if key is None:
        return dict(DADA_OPTS)
    return DADA_OPTS[key]

papa2.io — FASTQ I/O

FASTQ reading and dereplication.

derep_fastq

derep_fastq(filepath, verbose=False, with_map=False)

Dereplicate a FASTQ file.

Uses C implementation (zlib) when available for ~2x speedup. Always returns the per-read map (read_idx -> unique_idx). The with_map parameter is accepted for backward compatibility but ignored.

Returns:

Type Description

dict with keys: seqs: list[str], unique sequences sorted by abundance (descending) abundances: numpy int32 array quals: numpy float64 array (n_uniques x max_seqlen), average quality map: numpy int32 array, maps each read to its unique index (0-indexed)

Source code in papa2/io.py
def derep_fastq(filepath, verbose=False, with_map=False):
    """Dereplicate a FASTQ file.

    Uses C implementation (zlib) when available for ~2x speedup.
    Always returns the per-read map (read_idx -> unique_idx).
    The with_map parameter is accepted for backward compatibility but ignored.

    Returns:
        dict with keys:
            seqs: list[str], unique sequences sorted by abundance (descending)
            abundances: numpy int32 array
            quals: numpy float64 array (n_uniques x max_seqlen), average quality
            map: numpy int32 array, maps each read to its unique index (0-indexed)
    """
    # Use C implementation if available
    if _HAS_C_DEREP:
        result = _derep_fastq_c(filepath)
        if verbose:
            print(f"Read {result['abundances'].sum()} reads, {len(result['seqs'])} unique sequences")
        return result

    # Fallback: Python implementation
    opener = gzip.open if filepath.endswith(".gz") else open
    with opener(filepath, "rb") as f:
        raw = f.read()

    # Split into lines, extract every 4th (seq) and every 4th+3 (qual)
    lines = raw.split(b'\n')
    # Remove trailing empty line
    if lines and lines[-1] == b'':
        lines.pop()
    n_reads = len(lines) // 4

    # Phase 1: Build dedup index from sequences
    seq_to_idx = {}
    first_seen = []
    counts_list = []
    read_uid = np.empty(n_reads, dtype=np.int32)

    for i in range(n_reads):
        seq = lines[i * 4 + 1].upper()  # bytes
        idx = seq_to_idx.get(seq)
        if idx is None:
            idx = len(first_seen)
            seq_to_idx[seq] = idx
            first_seen.append(seq)
            counts_list.append(0)
        counts_list[idx] += 1
        read_uid[i] = idx

    n_uniques = len(first_seen)
    counts = np.array(counts_list, dtype=np.int32)
    maxlen = max(len(s) for s in first_seen) if first_seen else 0

    # Phase 2: Accumulate quality scores using numpy frombuffer
    qual_sums = np.zeros((n_uniques, maxlen), dtype=np.float64)

    for i in range(n_reads):
        uid = read_uid[i]
        qline = lines[i * 4 + 3]
        q = np.frombuffer(qline, dtype=np.uint8).astype(np.float64)
        q -= 33.0
        slen = len(q)
        qual_sums[uid, :slen] += q

    # Phase 3: Sort by abundance descending
    sort_idx = np.argsort(-counts)
    sorted_seqs = [first_seen[i].decode('ascii') for i in sort_idx]
    sorted_counts = counts[sort_idx]
    sorted_quals = qual_sums[sort_idx]

    # Average and NaN-pad
    for i in range(n_uniques):
        slen = len(sorted_seqs[i])
        if sorted_counts[i] > 0:
            sorted_quals[i, :slen] /= sorted_counts[i]
        sorted_quals[i, slen:] = np.nan

    # Remap read indices
    inv_sort = np.empty(n_uniques, dtype=np.int32)
    inv_sort[sort_idx] = np.arange(n_uniques, dtype=np.int32)
    rmap = inv_sort[read_uid]

    if verbose:
        print(f"Read {n_reads} reads, {n_uniques} unique sequences")

    return {
        "seqs": sorted_seqs,
        "abundances": sorted_counts,
        "quals": sorted_quals,
        "map": rmap,
    }

papa2.filter — Filtering and Trimming

Quality filtering, trimming, and PhiX removal for FASTQ files.

filter_and_trim

filter_and_trim(fwd, filt, rev=None, filt_rev=None, *, trim_left=0, trim_right=0, trunc_len=0, trunc_q=None, max_len=0, min_len=20, max_n=0, min_q=0, max_ee=float('inf'), rm_phix=True, rm_lowcomplex=0.0, orient_fwd=None, compress=True, multithread=False, verbose=False)

Filter and trim FASTQ files (single- or paired-end).

This is a convenience wrapper around :func:fastq_filter (single-end) and :func:fastq_paired_filter (paired-end). It dispatches to the appropriate function based on whether rev is provided and optionally parallelises across files with :class:~concurrent.futures.ProcessPoolExecutor.

Parameters

fwd : str or list of str Path(s) to forward (or single-end) input FASTQ file(s). filt : str or list of str Path(s) to filtered output FASTQ file(s). Must be the same length as fwd. rev : str, list of str, or None Path(s) to reverse-read input FASTQ file(s). None for single-end mode. filt_rev : str, list of str, or None Path(s) to filtered reverse-read output FASTQ file(s). Required when rev is not None. trim_left, trim_right, trunc_len, trunc_q, max_len, min_len, max_n, min_q, max_ee, rm_phix, rm_lowcomplex, orient_fwd, compress See :func:fastq_filter and :func:fastq_paired_filter. multithread : bool or int If True, use all available cores. If an integer > 1, use that many worker processes. False or 1 disables parallelism. verbose : bool Print per-file summaries.

Returns

numpy.ndarray Integer array of shape (n_files, 2) with columns [reads_in, reads_out].

Source code in papa2/filter.py
def filter_and_trim(
    fwd: Union[str, Sequence[str]],
    filt: Union[str, Sequence[str]],
    rev: Optional[Union[str, Sequence[str]]] = None,
    filt_rev: Optional[Union[str, Sequence[str]]] = None,
    *,
    trim_left: Union[int, Tuple[int, int]] = 0,
    trim_right: Union[int, Tuple[int, int]] = 0,
    trunc_len: Union[int, Tuple[int, int]] = 0,
    trunc_q: Union[None, int, Tuple[Optional[int], Optional[int]]] = None,
    max_len: Union[int, Tuple[int, int]] = 0,
    min_len: Union[int, Tuple[int, int]] = 20,
    max_n: Union[int, Tuple[int, int]] = 0,
    min_q: Union[int, Tuple[int, int]] = 0,
    max_ee: Union[float, Tuple[float, float]] = float("inf"),
    rm_phix: bool = True,
    rm_lowcomplex: Union[float, Tuple[float, float]] = 0.0,
    orient_fwd: Optional[str] = None,
    compress: bool = True,
    multithread: Union[bool, int] = False,
    verbose: bool = False,
) -> np.ndarray:
    """Filter and trim FASTQ files (single- or paired-end).

    This is a convenience wrapper around :func:`fastq_filter` (single-end)
    and :func:`fastq_paired_filter` (paired-end).  It dispatches to the
    appropriate function based on whether *rev* is provided and optionally
    parallelises across files with :class:`~concurrent.futures.ProcessPoolExecutor`.

    Parameters
    ----------
    fwd : str or list of str
        Path(s) to forward (or single-end) input FASTQ file(s).
    filt : str or list of str
        Path(s) to filtered output FASTQ file(s).  Must be the same length
        as *fwd*.
    rev : str, list of str, or None
        Path(s) to reverse-read input FASTQ file(s).  ``None`` for
        single-end mode.
    filt_rev : str, list of str, or None
        Path(s) to filtered reverse-read output FASTQ file(s).  Required
        when *rev* is not ``None``.
    trim_left, trim_right, trunc_len, trunc_q, max_len, min_len,
    max_n, min_q, max_ee, rm_phix, rm_lowcomplex, orient_fwd, compress
        See :func:`fastq_filter` and :func:`fastq_paired_filter`.
    multithread : bool or int
        If ``True``, use all available cores.  If an integer > 1, use
        that many worker processes.  ``False`` or ``1`` disables
        parallelism.
    verbose : bool
        Print per-file summaries.

    Returns
    -------
    numpy.ndarray
        Integer array of shape ``(n_files, 2)`` with columns
        ``[reads_in, reads_out]``.
    """
    # Normalise to lists
    if isinstance(fwd, str):
        fwd = [fwd]
    if isinstance(filt, str):
        filt = [filt]
    if isinstance(rev, str):
        rev = [rev]
    if isinstance(filt_rev, str):
        filt_rev = [filt_rev]

    n_files = len(fwd)
    if len(filt) != n_files:
        raise ValueError(
            f"fwd ({n_files} files) and filt ({len(filt)} files) must have "
            f"the same length."
        )

    paired = rev is not None
    if paired:
        if filt_rev is None:
            raise ValueError("filt_rev is required when rev is provided.")
        if len(rev) != n_files:
            raise ValueError(
                f"fwd ({n_files}) and rev ({len(rev)}) must have the same length."
            )
        if len(filt_rev) != n_files:
            raise ValueError(
                f"fwd ({n_files}) and filt_rev ({len(filt_rev)}) must have "
                f"the same length."
            )

    # Determine number of workers
    if multithread is True:
        n_workers = os.cpu_count() or 1
    elif isinstance(multithread, int) and multithread > 1:
        n_workers = multithread
    else:
        n_workers = 1

    # Common kwargs (shared by all files)
    common_kw = dict(
        trim_left=trim_left,
        trim_right=trim_right,
        trunc_len=trunc_len,
        trunc_q=trunc_q,
        max_len=max_len,
        min_len=min_len,
        max_n=max_n,
        min_q=min_q,
        max_ee=max_ee,
        rm_phix=rm_phix,
        rm_lowcomplex=rm_lowcomplex,
        orient_fwd=orient_fwd,
        compress=compress,
        verbose=verbose,
    )

    results = np.zeros((n_files, 2), dtype=np.int64)

    # Build per-file argument tuples (picklable for ProcessPoolExecutor)
    if paired:
        tasks = [
            (fwd[i], filt[i], rev[i], filt_rev[i], common_kw)
            for i in range(n_files)
        ]
    else:
        # Normalise tuple params to scalars for single-end
        se_kw = dict(common_kw)
        for key in (
            "trim_left", "trim_right", "trunc_len", "trunc_q",
            "max_len", "min_len", "max_n", "min_q", "max_ee",
            "rm_lowcomplex",
        ):
            v = se_kw[key]
            if isinstance(v, (list, tuple)):
                se_kw[key] = v[0]
        tasks = [(fwd[i], filt[i], se_kw) for i in range(n_files)]

    if n_workers > 1 and n_files > 1:
        with ProcessPoolExecutor(max_workers=n_workers) as pool:
            if paired:
                futures = {
                    pool.submit(_run_paired_task, t): i
                    for i, t in enumerate(tasks)
                }
            else:
                futures = {
                    pool.submit(_run_single_task, t): i
                    for i, t in enumerate(tasks)
                }
            for fut in futures:
                i = futures[fut]
                results[i] = fut.result()
    else:
        for i, t in enumerate(tasks):
            if paired:
                results[i] = _run_paired_task(t)
            else:
                results[i] = _run_single_task(t)

    return results

fastq_filter

fastq_filter(fn, fout, *, trim_left=0, trim_right=0, trunc_len=0, trunc_q=None, max_len=0, min_len=20, max_n=0, min_q=0, max_ee=float('inf'), rm_phix=True, rm_lowcomplex=0.0, orient_fwd=None, compress=True, verbose=False)

Filter and trim a single FASTQ file.

Parameters

fn : str Path to input FASTQ file (plain or gzipped). fout : str Path to output FASTQ file. trim_left : int Number of bases to trim from the 5' end. trim_right : int Number of bases to trim from the 3' end. trunc_len : int Truncate reads to exactly this length after trimming. Reads shorter than trunc_len after left/right trimming are discarded. Set to 0 to disable. trunc_q : int or None Truncate at the first quality score <= this value. None disables quality truncation. max_len : int Discard reads longer than this before trimming. 0 disables. min_len : int Discard reads shorter than this after all trimming. max_n : int Maximum number of ambiguous (non-ACGT) bases allowed. min_q : int Discard reads containing any quality score below this value. max_ee : float Maximum expected errors (sum(10^(-Q/10))). rm_phix : bool Remove reads matching the PhiX genome. rm_lowcomplex : float Remove reads with sequence complexity below this threshold. 0 disables. orient_fwd : str or None If set, orient reads so they begin with this primer sequence. Reads that match neither in the forward nor reverse-complement orientation are discarded. compress : bool Gzip-compress the output file. verbose : bool Print a summary line when done.

Returns

tuple of (int, int) (reads_in, reads_out)

Source code in papa2/filter.py
def fastq_filter(
    fn: str,
    fout: str,
    *,
    trim_left: int = 0,
    trim_right: int = 0,
    trunc_len: int = 0,
    trunc_q: Optional[int] = None,
    max_len: int = 0,
    min_len: int = 20,
    max_n: int = 0,
    min_q: int = 0,
    max_ee: float = float("inf"),
    rm_phix: bool = True,
    rm_lowcomplex: float = 0.0,
    orient_fwd: Optional[str] = None,
    compress: bool = True,
    verbose: bool = False,
) -> Tuple[int, int]:
    """Filter and trim a single FASTQ file.

    Parameters
    ----------
    fn : str
        Path to input FASTQ file (plain or gzipped).
    fout : str
        Path to output FASTQ file.
    trim_left : int
        Number of bases to trim from the 5' end.
    trim_right : int
        Number of bases to trim from the 3' end.
    trunc_len : int
        Truncate reads to exactly this length after trimming.  Reads
        shorter than *trunc_len* after left/right trimming are discarded.
        Set to 0 to disable.
    trunc_q : int or None
        Truncate at the first quality score ``<=`` this value.  ``None``
        disables quality truncation.
    max_len : int
        Discard reads longer than this **before** trimming.  0 disables.
    min_len : int
        Discard reads shorter than this **after** all trimming.
    max_n : int
        Maximum number of ambiguous (non-ACGT) bases allowed.
    min_q : int
        Discard reads containing any quality score below this value.
    max_ee : float
        Maximum expected errors (``sum(10^(-Q/10))``).
    rm_phix : bool
        Remove reads matching the PhiX genome.
    rm_lowcomplex : float
        Remove reads with sequence complexity below this threshold.
        0 disables.
    orient_fwd : str or None
        If set, orient reads so they begin with this primer sequence.
        Reads that match neither in the forward nor reverse-complement
        orientation are discarded.
    compress : bool
        Gzip-compress the output file.
    verbose : bool
        Print a summary line when done.

    Returns
    -------
    tuple of (int, int)
        ``(reads_in, reads_out)``
    """
    reads_in = 0
    reads_out = 0

    # Ensure output directory exists
    out_dir = os.path.dirname(fout)
    if out_dir:
        os.makedirs(out_dir, exist_ok=True)

    with _open_fq(fn) as fin, _write_opener(fout, compress) as fout_h:
        for header, seq, sep, qual_str in _fastq_records(fin):
            reads_in += 1

            # --- orient_fwd ---
            if orient_fwd is not None:
                result = _orient_read(seq, qual_str, orient_fwd)
                if result is None:
                    continue
                seq, qual_str = result

            # --- max_len (before trimming) ---
            if max_len > 0 and len(seq) > max_len:
                continue

            # --- trunc_q (truncate at first quality <= threshold) ---
            if trunc_q is not None:
                quals = _decode_quals(qual_str)
                trunc_pos = len(quals)
                for i, q in enumerate(quals):
                    if q <= trunc_q:
                        trunc_pos = i
                        break
                seq = seq[:trunc_pos]
                qual_str = qual_str[:trunc_pos]

            # --- trim_left / trim_right ---
            if trim_left > 0:
                seq = seq[trim_left:]
                qual_str = qual_str[trim_left:]
            if trim_right > 0 and trim_right < len(seq):
                seq = seq[: len(seq) - trim_right]
                qual_str = qual_str[: len(qual_str) - trim_right]
            elif trim_right > 0:
                seq = ""
                qual_str = ""

            # --- trunc_len ---
            if trunc_len > 0:
                if len(seq) < trunc_len:
                    continue
                seq = seq[:trunc_len]
                qual_str = qual_str[:trunc_len]

            # --- min_len (after all trimming) ---
            if len(seq) < min_len:
                continue

            # --- max_n ---
            if max_n >= 0 and _count_non_acgt(seq) > max_n:
                continue

            # Decode quality scores for remaining checks
            quals = _decode_quals(qual_str)

            # --- min_q ---
            if min_q > 0 and quals and min(quals) < min_q:
                continue

            # --- max_ee ---
            if max_ee < float("inf"):
                if _expected_errors(quals) > max_ee:
                    continue

            # --- rm_phix ---
            if rm_phix:
                phix_match = _is_phix([seq])
                if phix_match[0]:
                    continue

            # --- rm_lowcomplex ---
            if rm_lowcomplex > 0:
                complexity = _seq_complexity([seq])
                if complexity[0] < rm_lowcomplex:
                    continue

            # Write passing read
            fout_h.write(f"{header}\n{seq}\n{sep}\n{qual_str}\n")
            reads_out += 1

    if verbose:
        pct = (reads_out / reads_in * 100) if reads_in > 0 else 0.0
        print(
            f"Read in {reads_in} reads, output {reads_out} "
            f"({pct:.1f}%) filtered reads."
        )

    return reads_in, reads_out

fastq_paired_filter

fastq_paired_filter(fwd, filt_fwd, rev, filt_rev, *, trim_left=(0, 0), trim_right=(0, 0), trunc_len=(0, 0), trunc_q=(None, None), max_len=(0, 0), min_len=(20, 20), max_n=(0, 0), min_q=(0, 0), max_ee=(float('inf'), float('inf')), rm_phix=True, rm_lowcomplex=(0.0, 0.0), orient_fwd=None, compress=True, verbose=False)

Filter and trim paired FASTQ files.

Both reads in a pair must pass all filters for either to be kept, preserving read-pair synchronisation.

Parameters accept scalars (applied to both reads) or length-2 tuples (forward_value, reverse_value).

Parameters

fwd : str Path to forward-read input FASTQ. filt_fwd : str Path to filtered forward-read output FASTQ. rev : str Path to reverse-read input FASTQ. filt_rev : str Path to filtered reverse-read output FASTQ. trim_left, trim_right, trunc_len, trunc_q, max_len, min_len, max_n, min_q, max_ee, rm_lowcomplex See :func:fastq_filter for per-parameter documentation. Each accepts a scalar or a (fwd, rev) tuple. rm_phix : bool Remove reads matching PhiX. orient_fwd : str or None If set, orient forward reads to start with this sequence (reverse reads are reverse-complemented accordingly). compress : bool Gzip-compress output files. verbose : bool Print a summary.

Returns

tuple of (int, int) (reads_in, reads_out)

Source code in papa2/filter.py
def fastq_paired_filter(
    fwd: str,
    filt_fwd: str,
    rev: str,
    filt_rev: str,
    *,
    trim_left: Union[int, Tuple[int, int]] = (0, 0),
    trim_right: Union[int, Tuple[int, int]] = (0, 0),
    trunc_len: Union[int, Tuple[int, int]] = (0, 0),
    trunc_q: Union[None, int, Tuple[Optional[int], Optional[int]]] = (None, None),
    max_len: Union[int, Tuple[int, int]] = (0, 0),
    min_len: Union[int, Tuple[int, int]] = (20, 20),
    max_n: Union[int, Tuple[int, int]] = (0, 0),
    min_q: Union[int, Tuple[int, int]] = (0, 0),
    max_ee: Union[float, Tuple[float, float]] = (float("inf"), float("inf")),
    rm_phix: bool = True,
    rm_lowcomplex: Union[float, Tuple[float, float]] = (0.0, 0.0),
    orient_fwd: Optional[str] = None,
    compress: bool = True,
    verbose: bool = False,
) -> Tuple[int, int]:
    """Filter and trim paired FASTQ files.

    Both reads in a pair must pass **all** filters for either to be kept,
    preserving read-pair synchronisation.

    Parameters accept scalars (applied to both reads) or length-2 tuples
    ``(forward_value, reverse_value)``.

    Parameters
    ----------
    fwd : str
        Path to forward-read input FASTQ.
    filt_fwd : str
        Path to filtered forward-read output FASTQ.
    rev : str
        Path to reverse-read input FASTQ.
    filt_rev : str
        Path to filtered reverse-read output FASTQ.
    trim_left, trim_right, trunc_len, trunc_q, max_len, min_len,
    max_n, min_q, max_ee, rm_lowcomplex
        See :func:`fastq_filter` for per-parameter documentation.  Each
        accepts a scalar or a ``(fwd, rev)`` tuple.
    rm_phix : bool
        Remove reads matching PhiX.
    orient_fwd : str or None
        If set, orient forward reads to start with this sequence (reverse
        reads are reverse-complemented accordingly).
    compress : bool
        Gzip-compress output files.
    verbose : bool
        Print a summary.

    Returns
    -------
    tuple of (int, int)
        ``(reads_in, reads_out)``
    """

    def _as_pair(val, default=None):
        """Normalise a scalar or tuple to a 2-tuple."""
        if isinstance(val, (list, tuple)):
            if len(val) == 2:
                return tuple(val)
            raise ValueError(f"Expected length-2 tuple, got {len(val)}")
        return (val, val)

    tl = _as_pair(trim_left)
    tr = _as_pair(trim_right)
    tlen = _as_pair(trunc_len)
    tq = _as_pair(trunc_q)
    mxl = _as_pair(max_len)
    mnl = _as_pair(min_len)
    mn = _as_pair(max_n)
    mq = _as_pair(min_q)
    mee = _as_pair(max_ee)
    rlc = _as_pair(rm_lowcomplex)

    reads_in = 0
    reads_out = 0

    # Ensure output directories exist
    for p in (filt_fwd, filt_rev):
        d = os.path.dirname(p)
        if d:
            os.makedirs(d, exist_ok=True)

    _sentinel = object()

    with (
        _open_fq(fwd) as fin_f,
        _open_fq(rev) as fin_r,
        _write_opener(filt_fwd, compress) as fout_f,
        _write_opener(filt_rev, compress) as fout_r,
    ):
        from itertools import zip_longest
        for rec_f, rec_r in zip_longest(
            _fastq_records(fin_f), _fastq_records(fin_r), fillvalue=_sentinel
        ):
            if rec_f is _sentinel or rec_r is _sentinel:
                raise ValueError(
                    f"Forward and reverse FASTQ files have different numbers "
                    f"of reads ({fwd}, {rev}). Files must be synchronized."
                )
            hdr_f, seq_f, sep_f, qual_f = rec_f
            hdr_r, seq_r, sep_r, qual_r = rec_r
            reads_in += 1

            seqs = [seq_f, seq_r]
            quals_str = [qual_f, qual_r]
            keep = True

            # --- orient_fwd (forward read only; RC the reverse accordingly) ---
            if orient_fwd is not None:
                result = _orient_read(seqs[0], quals_str[0], orient_fwd)
                if result is None:
                    continue
                if result != (seqs[0], quals_str[0]):
                    # Forward was RC'd -- also RC reverse to maintain pairing
                    seqs[0], quals_str[0] = result
                    seqs[1] = _rc(seqs[1])
                    quals_str[1] = quals_str[1][::-1]

            for idx in range(2):
                if not keep:
                    break

                s = seqs[idx]
                qs = quals_str[idx]

                # --- max_len (before trimming) ---
                if mxl[idx] > 0 and len(s) > mxl[idx]:
                    keep = False
                    break

                # --- trunc_q ---
                if tq[idx] is not None:
                    decoded = _decode_quals(qs)
                    trunc_pos = len(decoded)
                    for i, q in enumerate(decoded):
                        if q <= tq[idx]:
                            trunc_pos = i
                            break
                    s = s[:trunc_pos]
                    qs = qs[:trunc_pos]

                # --- trim_left / trim_right ---
                if tl[idx] > 0:
                    s = s[tl[idx]:]
                    qs = qs[tl[idx]:]
                if tr[idx] > 0 and tr[idx] < len(s):
                    s = s[: len(s) - tr[idx]]
                    qs = qs[: len(qs) - tr[idx]]
                elif tr[idx] > 0:
                    s = ""
                    qs = ""

                # --- trunc_len ---
                if tlen[idx] > 0:
                    if len(s) < tlen[idx]:
                        keep = False
                        break
                    s = s[: tlen[idx]]
                    qs = qs[: tlen[idx]]

                # --- min_len ---
                if len(s) < mnl[idx]:
                    keep = False
                    break

                # --- max_n ---
                if mn[idx] >= 0 and _count_non_acgt(s) > mn[idx]:
                    keep = False
                    break

                decoded = _decode_quals(qs)

                # --- min_q ---
                if mq[idx] > 0 and decoded and min(decoded) < mq[idx]:
                    keep = False
                    break

                # --- max_ee ---
                if mee[idx] < float("inf"):
                    if _expected_errors(decoded) > mee[idx]:
                        keep = False
                        break

                # --- rm_lowcomplex ---
                if rlc[idx] > 0:
                    complexity = _seq_complexity([s])
                    if complexity[0] < rlc[idx]:
                        keep = False
                        break

                seqs[idx] = s
                quals_str[idx] = qs

            if not keep:
                continue

            # --- rm_phix (check both reads) ---
            if rm_phix:
                phix_flags = _is_phix(seqs)
                if phix_flags.any():
                    continue

            # Write passing pair
            fout_f.write(f"{hdr_f}\n{seqs[0]}\n{sep_f}\n{quals_str[0]}\n")
            fout_r.write(f"{hdr_r}\n{seqs[1]}\n{sep_r}\n{quals_str[1]}\n")
            reads_out += 1

    if verbose:
        pct = (reads_out / reads_in * 100) if reads_in > 0 else 0.0
        print(
            f"Read in {reads_in} paired-reads, output {reads_out} "
            f"({pct:.1f}%) filtered paired-reads."
        )

    return reads_in, reads_out

papa2.error — Error Models

Functions for estimating and manipulating the DADA2 error rate matrix.

loess_errfun

loess_errfun(trans)

Estimate error rates from transition counts using LOESS smoothing.

Mirrors R's loessErrfun from errorModels.R.

Parameters:

Name Type Description Default
trans

numpy array (16, ncol), transition counts. Rows are transitions (A2A,A2C,...,T2T), columns are quality scores.

required

Returns:

Type Description

numpy array (16, ncol), estimated error rates.

Source code in papa2/error.py
def loess_errfun(trans):
    """Estimate error rates from transition counts using LOESS smoothing.

    Mirrors R's loessErrfun from errorModels.R.

    Args:
        trans: numpy array (16, ncol), transition counts. Rows are transitions
               (A2A,A2C,...,T2T), columns are quality scores.

    Returns:
        numpy array (16, ncol), estimated error rates.
    """
    ncol = trans.shape[1]
    qq = np.arange(ncol, dtype=np.float64)
    est = np.zeros((12, ncol))  # 12 non-self transitions

    MIN_ERROR_RATE = 1e-7
    MAX_ERROR_RATE = 0.25

    idx = 0
    for nti in range(4):  # A, C, G, T
        base_rows = _BASE_ROWS[nti]
        tot = trans[base_rows].sum(axis=0).astype(np.float64)

        for ntj in range(4):
            if nti == ntj:
                continue
            row = nti * 4 + ntj
            errs = trans[row].astype(np.float64)

            # Log10 error rate with pseudocount
            with np.errstate(divide="ignore", invalid="ignore"):
                rlogp = np.log10((errs + 1.0) / tot)
            rlogp[~np.isfinite(rlogp)] = np.nan

            # Find valid (non-NaN) positions
            valid = ~np.isnan(rlogp)
            if valid.sum() < 2:
                est[idx] = MIN_ERROR_RATE
                idx += 1
                continue

            # LOESS fit on valid points
            x_valid = qq[valid]
            y_valid = rlogp[valid]
            w_valid = tot[valid]
            w_valid = np.maximum(w_valid, 1.0)

            pred = _lowess_fit(x_valid, y_valid, w_valid, eval_x=qq, span=0.75)

            # Fill NaN edges by repeating boundary values
            first_valid = np.where(valid)[0][0]
            last_valid = np.where(valid)[0][-1]
            pred[:first_valid] = pred[first_valid]
            pred[last_valid + 1:] = pred[last_valid]

            # Back-transform from log10
            est[idx] = 10.0 ** pred
            idx += 1

    # Clamp
    est = np.clip(est, MIN_ERROR_RATE, MAX_ERROR_RATE)

    # Build full 16-row matrix with self-transitions as complement
    # Row order: A2A,A2C,A2G,A2T, C2A,C2C,C2G,C2T, G2A,G2C,G2G,G2T, T2A,T2C,T2G,T2T
    # Non-self est order: A2C(0),A2G(1),A2T(2), C2A(3),C2G(4),C2T(5), G2A(6),G2C(7),G2T(8), T2A(9),T2C(10),T2G(11)
    err = np.zeros((16, ncol))
    err[1] = est[0]   # A2C
    err[2] = est[1]   # A2G
    err[3] = est[2]   # A2T
    err[0] = 1.0 - est[0:3].sum(axis=0)  # A2A

    err[4] = est[3]   # C2A
    err[6] = est[4]   # C2G
    err[7] = est[5]   # C2T
    err[5] = 1.0 - est[3:6].sum(axis=0)  # C2C

    err[8] = est[6]   # G2A
    err[9] = est[7]   # G2C
    err[11] = est[8]  # G2T
    err[10] = 1.0 - est[6:9].sum(axis=0)  # G2G

    err[12] = est[9]   # T2A
    err[13] = est[10]  # T2C
    err[14] = est[11]  # T2G
    err[15] = 1.0 - est[9:12].sum(axis=0)  # T2T

    return err

noqual_errfun

noqual_errfun(trans)

Error estimation ignoring quality scores (constant across Q).

Source code in papa2/error.py
def noqual_errfun(trans):
    """Error estimation ignoring quality scores (constant across Q)."""
    ncol = trans.shape[1]
    err = np.zeros((16, ncol))
    for nti in range(4):
        base_rows = _BASE_ROWS[nti]
        tot = trans[base_rows].sum()
        if tot == 0:
            for ntj in range(4):
                err[nti * 4 + ntj] = 0.25
            continue
        for ntj in range(4):
            row = nti * 4 + ntj
            rate = trans[row].sum() / tot
            err[row] = rate
    return err

inflate_err

inflate_err(err, inflation)

Inflate error rates by a factor (prevents premature convergence).

Mirrors R's inflateErr.

Source code in papa2/error.py
def inflate_err(err, inflation):
    """Inflate error rates by a factor (prevents premature convergence).

    Mirrors R's inflateErr.
    """
    out = err * inflation / (1.0 + (inflation - 1.0) * err)
    return np.clip(out, 0.0, 1.0)

pacbio_errfun

pacbio_errfun(trans)

PacBio-specific error function.

Mirrors R's PacBioErrfun from errorModels.R.

Quality score 93 (the last column, if present) is handled separately using a simple MLE with pseudocount: (count + 1) / (total + 4). All other quality scores are fit using loess_errfun.

If Q93 is not present (i.e. the transition matrix does not extend to quality 93), the function falls back entirely to loess_errfun.

Parameters:

Name Type Description Default
trans

numpy array (16, ncol), transition counts.

required

Returns:

Type Description

numpy array (16, ncol), estimated error rates.

Source code in papa2/error.py
def pacbio_errfun(trans):
    """PacBio-specific error function.

    Mirrors R's PacBioErrfun from errorModels.R.

    Quality score 93 (the last column, if present) is handled separately
    using a simple MLE with pseudocount: ``(count + 1) / (total + 4)``.
    All other quality scores are fit using ``loess_errfun``.

    If Q93 is not present (i.e. the transition matrix does not extend to
    quality 93), the function falls back entirely to ``loess_errfun``.

    Args:
        trans: numpy array (16, ncol), transition counts.

    Returns:
        numpy array (16, ncol), estimated error rates.
    """
    ncol = trans.shape[1]
    # Q93 is present if the matrix extends to column index 93
    # (columns are 0-indexed quality scores)
    has_q93 = ncol > 93

    if not has_q93:
        return loess_errfun(trans)

    # Separate Q93 column (last meaningful column at index 93)
    q93_col = 93
    trans_no93 = np.delete(trans, q93_col, axis=1)

    # Fit non-Q93 columns with loess
    err = loess_errfun(trans_no93)

    # Re-insert Q93 column
    err_full = np.insert(err, q93_col, 0.0, axis=1)

    # Compute Q93 error rates with MLE + pseudocount
    for nti in range(4):
        base_rows = _BASE_ROWS[nti]
        tot = sum(trans[row, q93_col] for row in base_rows)
        for ntj in range(4):
            row = nti * 4 + ntj
            if nti == ntj:
                continue
            count = trans[row, q93_col]
            err_full[row, q93_col] = (count + 1.0) / (tot + 4.0)
        # Self-transition: 1 - sum(off-diagonal)
        off_diag_sum = sum(
            err_full[nti * 4 + ntj, q93_col]
            for ntj in range(4) if ntj != nti
        )
        err_full[nti * 4 + nti, q93_col] = 1.0 - off_diag_sum

    # Trim to original ncol if the loess result had fewer columns
    if err_full.shape[1] > ncol:
        err_full = err_full[:, :ncol]

    return err_full

make_binned_qual_errfun

make_binned_qual_errfun(binned_q)

Return an error function that uses piecewise linear interpolation.

Mirrors R's dada2:::make.binned.qual.errfun.

Parameters:

Name Type Description Default
binned_q

array-like of quality score bin boundaries (sorted, ascending). These are the quality score values at which the error rates are "known" (from the transition matrix columns). Between bins, error rates are linearly interpolated.

required

Returns:

Type Description

A callable errfun(trans) that accepts a 16-row transition

matrix and returns a 16-row error rate matrix, using piecewise

linear interpolation between the binned quality scores.

Source code in papa2/error.py
def make_binned_qual_errfun(binned_q):
    """Return an error function that uses piecewise linear interpolation.

    Mirrors R's ``dada2:::make.binned.qual.errfun``.

    Args:
        binned_q: array-like of quality score bin boundaries (sorted,
            ascending).  These are the quality score values at which
            the error rates are "known" (from the transition matrix
            columns).  Between bins, error rates are linearly interpolated.

    Returns:
        A callable ``errfun(trans)`` that accepts a 16-row transition
        matrix and returns a 16-row error rate matrix, using piecewise
        linear interpolation between the binned quality scores.
    """
    binned_q = np.asarray(binned_q, dtype=np.float64)

    # Off-diagonal row indices in the standard 12-element order:
    # A2C, A2G, A2T, C2A, C2G, C2T, G2A, G2C, G2T, T2A, T2C, T2G
    off_diag_order = [
        (0, 1), (0, 2), (0, 3),  # A2C, A2G, A2T
        (1, 0), (1, 2), (1, 3),  # C2A, C2G, C2T
        (2, 0), (2, 1), (2, 3),  # G2A, G2C, G2T
        (3, 0), (3, 1), (3, 2),  # T2A, T2C, T2G
    ]

    MIN_ERROR_RATE = 1e-7
    MAX_ERROR_RATE = 0.25

    def errfun(trans):
        """Binned quality error function with piecewise linear interpolation.

        Args:
            trans: numpy array (16, ncol), transition counts.

        Returns:
            numpy array (16, ncol), estimated error rates.
        """
        ncol = trans.shape[1]
        qq = np.arange(ncol, dtype=np.float64)
        err = np.zeros((16, ncol), dtype=np.float64)

        # Compute observed error rates at each quality score
        for nti, ntj in off_diag_order:
            row = nti * 4 + ntj
            base_rows = _BASE_ROWS[nti]
            tot = trans[base_rows].sum(axis=0).astype(np.float64)
            errs = trans[row].astype(np.float64)

            with np.errstate(divide="ignore", invalid="ignore"):
                obs_rate = errs / tot
            obs_rate[~np.isfinite(obs_rate)] = 0.0

            # Get rates at the binned quality positions
            bin_indices = []
            bin_rates = []
            for bq in binned_q:
                idx = int(round(bq))
                if 0 <= idx < ncol:
                    bin_indices.append(idx)
                    bin_rates.append(obs_rate[idx])

            if len(bin_indices) < 2:
                # Not enough bins — use flat observed rate
                err[row, :] = np.clip(obs_rate, MIN_ERROR_RATE, MAX_ERROR_RATE)
                continue

            bin_indices = np.array(bin_indices, dtype=np.float64)
            bin_rates = np.array(bin_rates, dtype=np.float64)

            # Piecewise linear interpolation across all quality scores
            interp_rates = np.interp(qq, bin_indices, bin_rates)

            # Clamp
            err[row, :] = np.clip(interp_rates, MIN_ERROR_RATE, MAX_ERROR_RATE)

        # Self-transitions: 1 - sum(off-diagonal) for each base
        for nti in range(4):
            off_diag_rows = [nti * 4 + ntj for ntj in range(4) if ntj != nti]
            off_sum = sum(err[r, :] for r in off_diag_rows)
            err[nti * 4 + nti, :] = 1.0 - off_sum

        return err

    return errfun

get_initial_err

get_initial_err(ncol=41)

Get a maximum-possible initial error matrix (all 1.0, matching R).

Source code in papa2/error.py
def get_initial_err(ncol=41):
    """Get a maximum-possible initial error matrix (all 1.0, matching R)."""
    err = np.full((16, ncol), 1.0)
    return err

papa2.paired — Paired-End Merging

High-level paired-read merging, mirroring R's mergePairs().

merge_pairs

merge_pairs(dadaF, derepF, dadaR, derepR, min_overlap=12, max_mismatch=0, trim_overhang=False, just_concatenate=False, verbose=False)

Merge denoised forward and reverse reads into full amplicon sequences.

Mirrors the R dada2::mergePairs() function.

Parameters:

Name Type Description Default
dadaF

dict from dada() with keys 'denoised', 'cluster_seqs', 'cluster_abunds', 'map' (0-indexed cluster assignment per unique).

required
derepF

dict from derep_fastq() with key 'map' (0-indexed unique assignment per read).

required
dadaR

same structure as dadaF, for reverse reads.

required
derepR

same structure as derepF, for reverse reads.

required
min_overlap

minimum overlap required for merging (default 12).

12
max_mismatch

maximum mismatches allowed in the overlap (default 0).

0
trim_overhang

if True, trim overhanging ends of the merged sequence (default False).

False
just_concatenate

if True, concatenate rather than merge (inserts 10 Ns between forward and reverse; default False).

False
verbose

print progress information (default False).

False

Returns:

Type Description

List of dicts sorted by abundance (descending), each with keys: 'sequence': merged sequence (str) 'abundance': number of reads supporting this pair 'forward': 0-indexed forward cluster index 'reverse': 0-indexed reverse cluster index 'nmatch': number of matches in overlap 'nmismatch': number of mismatches in overlap 'nindel': number of indels in overlap 'accept': whether the pair passed the overlap/mismatch criteria

Source code in papa2/paired.py
def merge_pairs(dadaF, derepF, dadaR, derepR,
                min_overlap=12, max_mismatch=0,
                trim_overhang=False, just_concatenate=False,
                verbose=False):
    """Merge denoised forward and reverse reads into full amplicon sequences.

    Mirrors the R dada2::mergePairs() function.

    Args:
        dadaF: dict from dada() with keys 'denoised', 'cluster_seqs',
               'cluster_abunds', 'map' (0-indexed cluster assignment per unique).
        derepF: dict from derep_fastq() with key 'map' (0-indexed unique
                assignment per read).
        dadaR: same structure as dadaF, for reverse reads.
        derepR: same structure as derepF, for reverse reads.
        min_overlap: minimum overlap required for merging (default 12).
        max_mismatch: maximum mismatches allowed in the overlap (default 0).
        trim_overhang: if True, trim overhanging ends of the merged sequence
                       (default False).
        just_concatenate: if True, concatenate rather than merge (inserts 10 Ns
                          between forward and reverse; default False).
        verbose: print progress information (default False).

    Returns:
        List of dicts sorted by abundance (descending), each with keys:
            'sequence': merged sequence (str)
            'abundance': number of reads supporting this pair
            'forward': 0-indexed forward cluster index
            'reverse': 0-indexed reverse cluster index
            'nmatch': number of matches in overlap
            'nmismatch': number of mismatches in overlap
            'nindel': number of indels in overlap
            'accept': whether the pair passed the overlap/mismatch criteria
    """
    # --- Step 1: Map each read to its denoised cluster ---
    # derepF['map'] maps read -> unique index (0-indexed)
    # dadaF['map'] maps unique index -> cluster index (0-indexed, -1 = unassigned)
    dada_map_F = np.asarray(dadaF["map"], dtype=np.int32)
    derep_map_F = np.asarray(derepF["map"], dtype=np.int32)
    dada_map_R = np.asarray(dadaR["map"], dtype=np.int32)
    derep_map_R = np.asarray(derepR["map"], dtype=np.int32)

    # Per-read cluster assignment: rF[i] = cluster of read i
    rF = dada_map_F[derep_map_F]
    rR = dada_map_R[derep_map_R]

    # --- Step 2: Find unique (forward, reverse) cluster pairs with counts ---
    # Only keep reads that were assigned to a cluster (not -1)
    valid = (rF >= 0) & (rR >= 0)
    pairs = list(zip(rF[valid].tolist(), rR[valid].tolist()))
    pair_counts = Counter(pairs)

    if verbose:
        print(f"Found {len(pair_counts)} unique F/R cluster pairs from "
              f"{sum(pair_counts.values())} reads.")

    # Get cluster sequences
    fwd_seqs = dadaF["cluster_seqs"]
    rev_seqs = dadaR["cluster_seqs"]

    # Alignment scoring: stringent defaults for merging
    # match=1, mismatch=-64, gap=-64 makes NW alignment find the overlap
    # with effectively zero tolerance (for max_mismatch=0)
    if max_mismatch == 0:
        nw_match = 1
        nw_mismatch = -64
        nw_gap = -64
    else:
        # More permissive alignment for non-zero max_mismatch
        nw_match = 1
        nw_mismatch = -8
        nw_gap = -8

    # --- Step 3: Process each unique pair ---
    results = []
    for (fi, ri), abundance in pair_counts.items():
        seq_F = fwd_seqs[fi]
        seq_R_rc = rc(rev_seqs[ri])

        if just_concatenate:
            merged = seq_F + "N" * 10 + seq_R_rc
            results.append({
                "sequence": merged,
                "abundance": abundance,
                "forward": fi,
                "reverse": ri,
                "nmatch": 0,
                "nmismatch": 0,
                "nindel": 0,
                "accept": True,
            })
            continue

        # NW ends-free alignment
        al1, al2 = nwalign(seq_F, seq_R_rc,
                            match=nw_match, mismatch=nw_mismatch,
                            gap_p=nw_gap, band=-1)

        # Evaluate the alignment
        nmatch, nmismatch, nindel = eval_pair(al1, al2)

        # Accept/reject based on criteria
        accept = (nmatch >= min_overlap) and ((nmismatch + nindel) <= max_mismatch)

        if accept:
            # Build consensus (prefer forward sequence on mismatch)
            merged = pair_consensus(al1, al2, prefer=1,
                                    trim_overhang=trim_overhang)
        else:
            merged = ""

        if verbose and not accept:
            print(f"Pair F{fi}/R{ri} (abd={abundance}): "
                  f"{nmatch} match, {nmismatch} mismatch, {nindel} indel -- REJECTED")

        results.append({
            "sequence": merged,
            "abundance": abundance,
            "forward": fi,
            "reverse": ri,
            "nmatch": nmatch,
            "nmismatch": nmismatch,
            "nindel": nindel,
            "accept": accept,
        })

    # --- Step 4: Sort by abundance descending ---
    results.sort(key=lambda x: x["abundance"], reverse=True)

    if verbose:
        n_accept = sum(1 for r in results if r["accept"])
        n_total = len(results)
        total_reads = sum(r["abundance"] for r in results if r["accept"])
        print(f"Accepted {n_accept}/{n_total} unique pairs, "
              f"representing {total_reads} merged reads.")

    return results

papa2.chimera — Chimera Removal

Chimera detection and removal functions matching R's dada2 interface.

remove_bimera_denovo

remove_bimera_denovo(seqtab, method='consensus', min_fold=1.5, min_abund=2, allow_one_off=False, min_one_off_par_dist=4, min_sample_fraction=0.9, ignore_n_negatives=1, match=5, mismatch=-4, gap_p=-8, max_shift=16, verbose=False)

Remove bimeric sequences from a sequence table.

Mirrors R's removeBimeraDenovo.

Parameters:

Name Type Description Default
seqtab

dict with keys: "table": numpy int32 array (samples x ASVs), column-major preferred. "seqs": list of ASV sequences (str, ACGT), length = ncol.

required
method

"consensus" (default), "pooled", or "per-sample". - "consensus": flag per-sample, then remove ASVs flagged in enough samples (controlled by min_sample_fraction and ignore_n_negatives). - "pooled": sum across samples, treat as single sample. - "per-sample": zero only the sample/ASV cells flagged as chimeric, then drop all-zero ASV columns.

'consensus'
min_fold

parent fold-abundance threshold.

1.5
min_abund

parent minimum absolute abundance.

2
allow_one_off

allow one mismatch in chimera model.

False
min_one_off_par_dist

min hamming distance for one-off parents.

4
min_sample_fraction

fraction of present samples that must flag chimeric for consensus removal (default 0.9).

0.9
ignore_n_negatives

ignore this many non-flagging samples (default 1). An ASV is chimeric if nflag >= nsam - ignore_n_negatives, provided nflag/nsam >= min_sample_fraction.

1
match, mismatch, gap_p, max_shift

NW alignment parameters.

required
verbose

print progress information.

False

Returns:

Type Description

dict with: "table": numpy int32 array with chimeric columns removed. "seqs": list of non-chimeric ASV sequences. "is_chimera": numpy bool array (ncol,) for "pooled"/"consensus", or numpy bool array (nrow, ncol) for "per-sample".

Source code in papa2/chimera.py
def remove_bimera_denovo(seqtab, method="consensus", min_fold=1.5,
                         min_abund=2, allow_one_off=False,
                         min_one_off_par_dist=4, min_sample_fraction=0.9,
                         ignore_n_negatives=1,
                         match=5, mismatch=-4, gap_p=-8, max_shift=16,
                         verbose=False):
    """Remove bimeric sequences from a sequence table.

    Mirrors R's removeBimeraDenovo.

    Args:
        seqtab: dict with keys:
            "table": numpy int32 array (samples x ASVs), column-major preferred.
            "seqs": list of ASV sequences (str, ACGT), length = ncol.
        method: "consensus" (default), "pooled", or "per-sample".
            - "consensus": flag per-sample, then remove ASVs flagged in
              enough samples (controlled by min_sample_fraction and
              ignore_n_negatives).
            - "pooled": sum across samples, treat as single sample.
            - "per-sample": zero only the sample/ASV cells flagged as
              chimeric, then drop all-zero ASV columns.
        min_fold: parent fold-abundance threshold.
        min_abund: parent minimum absolute abundance.
        allow_one_off: allow one mismatch in chimera model.
        min_one_off_par_dist: min hamming distance for one-off parents.
        min_sample_fraction: fraction of present samples that must flag
            chimeric for consensus removal (default 0.9).
        ignore_n_negatives: ignore this many non-flagging samples
            (default 1). An ASV is chimeric if
            nflag >= nsam - ignore_n_negatives, provided
            nflag/nsam >= min_sample_fraction.
        match, mismatch, gap_p, max_shift: NW alignment parameters.
        verbose: print progress information.

    Returns:
        dict with:
            "table": numpy int32 array with chimeric columns removed.
            "seqs": list of non-chimeric ASV sequences.
            "is_chimera": numpy bool array (ncol,) for "pooled"/"consensus",
              or numpy bool array (nrow, ncol) for "per-sample".
    """
    if isinstance(seqtab, dict):
        mat = seqtab["table"]
        seqs = list(seqtab["seqs"])
    else:
        raise TypeError("seqtab must be a dict with 'table' and 'seqs' keys")

    mat = np.asarray(mat, dtype=np.int32)
    if mat.ndim == 1:
        mat = mat.reshape(1, -1)
    nrow, ncol = mat.shape

    if ncol != len(seqs):
        raise ValueError("Number of sequences ({}) must match number of "
                         "columns ({})".format(len(seqs), ncol))

    if method == "pooled":
        # Sum across samples, treat as single-sample
        pooled = mat.sum(axis=0).reshape(1, -1).astype(np.int32)
        flags = is_bimera_denovo(
            pooled[0], seqs,
            allow_one_off=allow_one_off,
            min_one_off_par_dist=min_one_off_par_dist,
            min_fold=min_fold, min_abund=min_abund,
            match=match, mismatch=mismatch, gap_p=gap_p, max_shift=max_shift
        )
        is_chimera = flags
    elif method == "consensus":
        # Use the table-level C function for consensus detection
        mat_f = np.asfortranarray(mat, dtype=np.int32)
        result = table_bimera(
            mat_f, seqs,
            min_fold=min_fold, min_abund=min_abund,
            allow_one_off=allow_one_off,
            min_one_off_par_dist=min_one_off_par_dist,
            match=match, mismatch=mismatch, gap_p=gap_p, max_shift=max_shift
        )
        nflag = result["nflag"]
        nsam = result["nsam"]

        # Apply consensus logic (matching R's isBimeraDenovoTable)
        # R logic: flag if nflag > 0 AND
        #   (nflag >= nsam OR nflag >= (nsam - ignoreNNegatives) * minSampleFraction)
        is_chimera = np.zeros(ncol, dtype=bool)
        for j in range(ncol):
            if nflag[j] <= 0 or nsam[j] <= 0:
                continue
            if nflag[j] >= nsam[j]:
                is_chimera[j] = True
            elif nflag[j] >= (nsam[j] - ignore_n_negatives) * min_sample_fraction:
                is_chimera[j] = True
    elif method == "per-sample":
        per_sample = np.zeros((nrow, ncol), dtype=bool)
        new_mat = mat.copy()
        for i in range(nrow):
            flags = is_bimera_denovo(
                new_mat[i], seqs,
                allow_one_off=allow_one_off,
                min_one_off_par_dist=min_one_off_par_dist,
                min_fold=min_fold, min_abund=min_abund,
                match=match, mismatch=mismatch, gap_p=gap_p, max_shift=max_shift
            )
            per_sample[i] = flags
            new_mat[i, flags] = 0

        keep = new_mat.sum(axis=0) > 0
        new_mat = new_mat[:, keep]
        new_seqs = [s for s, k in zip(seqs, keep) if k]
        return {
            "table": new_mat,
            "seqs": new_seqs,
            "is_chimera": per_sample,
        }
    else:
        raise ValueError("method must be 'consensus', 'pooled', or 'per-sample'")

    if verbose:
        n_chim = int(is_chimera.sum())
        n_total = ncol
        print("Identified {} bimeras out of {} input sequences.".format(
            n_chim, n_total))

    # Filter out chimeric columns
    keep = ~is_chimera
    new_mat = mat[:, keep].copy()
    new_seqs = [s for s, k in zip(seqs, keep) if k]

    return {
        "table": new_mat,
        "seqs": new_seqs,
        "is_chimera": is_chimera,
    }

is_bimera_denovo

is_bimera_denovo(seqtab_row, seqs, allow_one_off=False, min_one_off_par_dist=4, min_fold=1.5, min_abund=2, match=5, mismatch=-4, gap_p=-8, max_shift=16)

Check whether a single sequence is a bimera of more-abundant parents.

This mirrors R's isBimeraDenovo for a single sample (one row of the sequence table).

Parameters:

Name Type Description Default
seqtab_row

1-D array-like of int abundances, one per ASV.

required
seqs

list of ASV sequences (str, ACGT), same length as seqtab_row.

required
allow_one_off

allow one mismatch in chimera model.

False
min_one_off_par_dist

min hamming distance between parents for one-off.

4
min_fold

parent must be this-fold more abundant than the query.

1.5
min_abund

parent minimum absolute abundance.

2
match, mismatch, gap_p, max_shift

NW alignment parameters.

required

Returns:

Type Description

numpy bool array (n_seqs,): True where ASV is flagged as bimera.

Source code in papa2/chimera.py
def is_bimera_denovo(seqtab_row, seqs, allow_one_off=False,
                     min_one_off_par_dist=4, min_fold=1.5, min_abund=2,
                     match=5, mismatch=-4, gap_p=-8, max_shift=16):
    """Check whether a single sequence is a bimera of more-abundant parents.

    This mirrors R's isBimeraDenovo for a single sample (one row of the
    sequence table).

    Args:
        seqtab_row: 1-D array-like of int abundances, one per ASV.
        seqs: list of ASV sequences (str, ACGT), same length as seqtab_row.
        allow_one_off: allow one mismatch in chimera model.
        min_one_off_par_dist: min hamming distance between parents for one-off.
        min_fold: parent must be this-fold more abundant than the query.
        min_abund: parent minimum absolute abundance.
        match, mismatch, gap_p, max_shift: NW alignment parameters.

    Returns:
        numpy bool array (n_seqs,): True where ASV is flagged as bimera.
    """
    row = np.asarray(seqtab_row, dtype=np.int32)
    n = len(seqs)
    if n == 0:
        return np.array([], dtype=bool)

    # Sort by decreasing abundance for parent ordering
    order = np.argsort(-row)
    flags = np.zeros(n, dtype=bool)

    for idx in range(n):
        j = order[idx]
        if row[j] <= 0:
            continue
        # Gather parents: more abundant sequences
        parents = []
        for k in order:
            if row[k] > min_fold * row[j] and row[k] >= min_abund:
                parents.append(seqs[k])
        if len(parents) == 0:
            continue
        flags[j] = is_bimera(seqs[j], parents,
                             allow_one_off=allow_one_off,
                             min_one_off_par_dist=min_one_off_par_dist,
                             match=match, mismatch=mismatch,
                             gap_p=gap_p, max_shift=max_shift)
    return flags

papa2.taxonomy — Taxonomic Classification

Bayesian k-mer classifier for taxonomic assignment.

assign_taxonomy

assign_taxonomy(seqs, ref_fasta, min_boot=50, try_rc=False, output_bootstraps=False, tax_levels=('Kingdom', 'Phylum', 'Class', 'Order', 'Family', 'Genus', 'Species'), verbose=False)

Classify sequences against a reference taxonomy using the RDP Naive Bayesian Classifier method.

This is a Python port of dada2's assignTaxonomy() R function.

Parameters:

Name Type Description Default
seqs

Query sequences -- any type accepted by :func:papa2.utils.get_sequences (list of strings, FASTA path, pandas DataFrame, dict, etc.).

required
ref_fasta str

Path to a reference FASTA file (may be gzipped). Headers must contain semicolon-delimited taxonomy. UNITE-formatted databases are detected and parsed automatically.

required
min_boot int

Minimum bootstrap confidence (0--100) for retaining a taxonomic assignment at each rank. Ranks below this threshold are set to None.

50
try_rc bool

If True, sequences that fail to classify on the forward strand are re-tried on the reverse complement.

False
output_bootstraps bool

If True, return a dict with keys "tax" (the taxonomy DataFrame) and "boot" (a DataFrame of bootstrap values).

False
tax_levels Sequence[str]

Column names for the taxonomic ranks in the output DataFrame. Must match the number of semicolon-delimited levels in the reference database headers (shorter reference taxonomies are padded with None).

('Kingdom', 'Phylum', 'Class', 'Order', 'Family', 'Genus', 'Species')
verbose bool

Print progress messages.

False

Returns:

Name Type Description
A Union[DataFrame, Dict[str, DataFrame]]

class:pandas.DataFrame with query sequences as the index and

Union[DataFrame, Dict[str, DataFrame]]

tax_levels as columns. If output_bootstraps is True,

Union[DataFrame, Dict[str, DataFrame]]

returns a dict {"tax": <DataFrame>, "boot": <DataFrame>}.

Source code in papa2/taxonomy.py
def assign_taxonomy(
    seqs,
    ref_fasta: str,
    min_boot: int = 50,
    try_rc: bool = False,
    output_bootstraps: bool = False,
    tax_levels: Sequence[str] = (
        "Kingdom", "Phylum", "Class", "Order", "Family", "Genus", "Species",
    ),
    verbose: bool = False,
) -> Union[pd.DataFrame, Dict[str, pd.DataFrame]]:
    """Classify sequences against a reference taxonomy using the RDP
    Naive Bayesian Classifier method.

    This is a Python port of dada2's ``assignTaxonomy()`` R function.

    Args:
        seqs: Query sequences -- any type accepted by
            :func:`papa2.utils.get_sequences` (list of strings, FASTA path,
            pandas DataFrame, dict, etc.).
        ref_fasta: Path to a reference FASTA file (may be gzipped).  Headers
            must contain semicolon-delimited taxonomy.  UNITE-formatted
            databases are detected and parsed automatically.
        min_boot: Minimum bootstrap confidence (0--100) for retaining a
            taxonomic assignment at each rank.  Ranks below this threshold
            are set to ``None``.
        try_rc: If ``True``, sequences that fail to classify on the forward
            strand are re-tried on the reverse complement.
        output_bootstraps: If ``True``, return a dict with keys ``"tax"``
            (the taxonomy DataFrame) and ``"boot"`` (a DataFrame of bootstrap
            values).
        tax_levels: Column names for the taxonomic ranks in the output
            DataFrame.  Must match the number of semicolon-delimited levels
            in the reference database headers (shorter reference taxonomies
            are padded with ``None``).
        verbose: Print progress messages.

    Returns:
        A :class:`pandas.DataFrame` with query sequences as the index and
        ``tax_levels`` as columns.  If *output_bootstraps* is ``True``,
        returns a dict ``{"tax": <DataFrame>, "boot": <DataFrame>}``.
    """
    from ._cdada import run_taxonomy

    # ------------------------------------------------------------------
    # 1. Get query sequences
    # ------------------------------------------------------------------
    query_seqs = get_sequences(seqs)
    n_seqs = len(query_seqs)
    n_levels = len(tax_levels)

    if verbose:
        logger.info("assign_taxonomy: classifying %d sequences against %s",
                     n_seqs, ref_fasta)

    # ------------------------------------------------------------------
    # 2. Read and parse the reference database
    # ------------------------------------------------------------------
    ref_records = _parse_ref_fasta(ref_fasta)
    headers = [hdr for hdr, _ in ref_records]
    ref_sequences = [seq for _, seq in ref_records]

    unite = _is_unite(headers)
    if verbose:
        logger.info("assign_taxonomy: UNITE format detected: %s", unite)

    # Extract raw taxonomy strings
    if unite:
        raw_taxa = [_parse_taxonomy_unite(h) for h in headers]
    else:
        raw_taxa = [_parse_taxonomy_standard(h) for h in headers]

    # ------------------------------------------------------------------
    # 3. Split into levels, pad to n_levels
    # ------------------------------------------------------------------
    # Each tax string is "K;P;C;O;F;G;S" -- split on ';', stripping empty
    # trailing entries.
    split_taxa: List[List[str]] = []
    for ts in raw_taxa:
        parts = [p.strip() for p in ts.split(";") if p.strip()]
        # Pad with sentinel if shorter than n_levels
        while len(parts) < n_levels:
            parts.append(_UNSPECIFIED)
        # Truncate if longer
        parts = parts[:n_levels]
        split_taxa.append(parts)

    # ------------------------------------------------------------------
    # 4. Build integer maps for the C function
    # ------------------------------------------------------------------
    # A "genus" in dada2 terminology is a unique full taxonomy string
    # (all levels concatenated).  Build a mapping from each reference
    # sequence to a genus index, and a matrix mapping each genus to
    # integer-encoded level values.

    # First, collect unique level values per rank.
    level_to_int: List[Dict[str, int]] = [dict() for _ in range(n_levels)]
    for taxa in split_taxa:
        for lvl_idx, val in enumerate(taxa):
            if val not in level_to_int[lvl_idx]:
                level_to_int[lvl_idx][val] = len(level_to_int[lvl_idx])

    # Build the "genus" key as a tuple of all level values.
    genus_map: Dict[Tuple[str, ...], int] = {}
    ref_to_genus = np.empty(len(ref_sequences), dtype=np.int32)

    for ref_idx, taxa in enumerate(split_taxa):
        key = tuple(taxa)
        if key not in genus_map:
            genus_map[key] = len(genus_map)
        ref_to_genus[ref_idx] = genus_map[key]

    n_genus = len(genus_map)

    # genusmat: (n_genus, n_levels), integer-encoded level values.
    genusmat = np.zeros((n_genus, n_levels), dtype=np.int32)
    for key, gidx in genus_map.items():
        for lvl_idx, val in enumerate(key):
            genusmat[gidx, lvl_idx] = level_to_int[lvl_idx][val]

    # Reverse lookup: int -> string for each level.
    int_to_level: List[Dict[int, str]] = []
    for lvl_idx in range(n_levels):
        int_to_level.append({v: k for k, v in level_to_int[lvl_idx].items()})

    # Reverse lookup: genus index -> taxonomy tuple.
    idx_to_genus: Dict[int, Tuple[str, ...]] = {v: k for k, v in genus_map.items()}

    if verbose:
        logger.info(
            "assign_taxonomy: %d references, %d unique genera, %d levels",
            len(ref_sequences), n_genus, n_levels,
        )

    # ------------------------------------------------------------------
    # 5. Call the C classifier
    # ------------------------------------------------------------------
    result = run_taxonomy(
        query_seqs, ref_sequences,
        ref_to_genus, genusmat,
        n_genus, n_levels,
        verbose=verbose,
    )
    rval = result["rval"]    # (n_seqs,) int32, 1-indexed best genus (0=NA)
    rboot = result["rboot"]  # (n_seqs, n_levels) int32, bootstrap counts

    # ------------------------------------------------------------------
    # 5b. Optionally try reverse complement for unassigned sequences
    # ------------------------------------------------------------------
    if try_rc:
        unassigned = np.where(rval == 0)[0]
        if len(unassigned) > 0:
            rc_seqs = [_rc(query_seqs[i]) for i in unassigned]
            if verbose:
                logger.info(
                    "assign_taxonomy: trying reverse complement for %d "
                    "unassigned sequences", len(unassigned),
                )
            rc_result = run_taxonomy(
                rc_seqs, ref_sequences,
                ref_to_genus, genusmat,
                n_genus, n_levels,
                verbose=verbose,
            )
            rc_rval = rc_result["rval"]
            rc_rboot = rc_result["rboot"]
            # Fill in previously unassigned positions
            for j, orig_idx in enumerate(unassigned):
                if rc_rval[j] != 0:
                    rval[orig_idx] = rc_rval[j]
                    rboot[orig_idx, :] = rc_rboot[j, :]

    # ------------------------------------------------------------------
    # 6. Parse results into taxonomy strings, masking low-bootstrap ranks
    # ------------------------------------------------------------------
    tax_out = np.full((n_seqs, n_levels), None, dtype=object)
    boot_out = np.zeros((n_seqs, n_levels), dtype=np.int64)

    for i in range(n_seqs):
        gidx = rval[i]
        if gidx == 0:
            # No assignment
            continue
        # rval is 1-indexed; convert to 0-indexed.
        gidx_0 = gidx - 1
        taxa_tuple = idx_to_genus.get(gidx_0)
        if taxa_tuple is None:
            continue

        for lvl in range(n_levels):
            boot_out[i, lvl] = rboot[i, lvl]
            val = taxa_tuple[lvl]
            # Mask ranks below min_boot or with the unspecified sentinel.
            if val == _UNSPECIFIED:
                tax_out[i, lvl] = None
            elif rboot[i, lvl] < min_boot:
                # Once a rank is masked, all deeper ranks should also be
                # masked (dada2 behaviour).
                tax_out[i, lvl:] = None
                break
            else:
                tax_out[i, lvl] = val

    # ------------------------------------------------------------------
    # 7. Build output DataFrame(s)
    # ------------------------------------------------------------------
    tax_levels_list = list(tax_levels)

    tax_df = pd.DataFrame(tax_out, columns=tax_levels_list, index=query_seqs)
    tax_df.index.name = "sequence"

    if verbose:
        assigned_counts = [
            int((tax_df[col].notna()).sum()) for col in tax_levels_list
        ]
        for col, cnt in zip(tax_levels_list, assigned_counts):
            logger.info("assign_taxonomy: %s assigned: %d / %d",
                         col, cnt, n_seqs)

    if output_bootstraps:
        boot_df = pd.DataFrame(
            boot_out, columns=tax_levels_list, index=query_seqs,
        )
        boot_df.index.name = "sequence"
        return {"tax": tax_df, "boot": boot_df}

    return tax_df

papa2.utils — Utilities

Utility functions for taxonomy assignment, sequence table operations, quality profiling, FASTA I/O, PhiX detection, and sequence complexity analysis.

make_sequence_table

make_sequence_table(samples_dict, order_by='abundance')

Construct a sample-by-sequence observation matrix.

Parameters:

Name Type Description Default
samples_dict Dict[str, Dict[str, int]]

{sample_name: {sequence: abundance}}.

required
order_by str

How to order the columns. "abundance" (default) sorts by total abundance across all samples (descending). "nsamples" sorts by number of samples in which the sequence appears. None for no ordering.

'abundance'

Returns:

Type Description

A pandas DataFrame with samples as rows and sequences as columns.

Source code in papa2/utils.py
def make_sequence_table(
    samples_dict: Dict[str, Dict[str, int]],
    order_by: str = "abundance",
):
    """Construct a sample-by-sequence observation matrix.

    Args:
        samples_dict: ``{sample_name: {sequence: abundance}}``.
        order_by: How to order the columns.  ``"abundance"`` (default) sorts
            by total abundance across all samples (descending).
            ``"nsamples"`` sorts by number of samples in which the sequence
            appears.  ``None`` for no ordering.

    Returns:
        A pandas DataFrame with samples as rows and sequences as columns.
    """
    import pandas as pd

    # Accept a list of dada results or merger lists (extract denoised dicts)
    if isinstance(samples_dict, list):
        converted = {}
        for i, item in enumerate(samples_dict):
            name = f"sample_{i+1}"
            if isinstance(item, dict) and "denoised" in item:
                converted[name] = item["denoised"]
            elif isinstance(item, dict):
                converted[name] = item
            elif isinstance(item, list):
                # List of merger dicts (from merge_pairs)
                merged = {}
                for m in item:
                    if m.get("accept", True):
                        seq = m["sequence"]
                        merged[seq] = merged.get(seq, 0) + m["abundance"]
                converted[name] = merged
            else:
                raise TypeError(f"Unexpected item type in samples list: {type(item)}")
        samples_dict = converted

    # Also handle {name: [merger_list]} dicts and {name: dada_result} dicts
    converted2 = {}
    for sample_name, value in samples_dict.items():
        if isinstance(value, list):
            # List of merger dicts from merge_pairs
            merged = {}
            for m in value:
                if isinstance(m, dict) and "sequence" in m:
                    if m.get("accept", True):
                        seq = m["sequence"]
                        merged[seq] = merged.get(seq, 0) + m["abundance"]
                else:
                    break  # Not merger dicts, leave as-is
            else:
                converted2[sample_name] = merged
                continue
            converted2[sample_name] = value
        elif isinstance(value, dict) and "denoised" in value:
            converted2[sample_name] = value["denoised"]
        else:
            converted2[sample_name] = value
    samples_dict = converted2

    logger.info("[INFO] make_sequence_table: %d samples.", len(samples_dict))

    # Gather all unique sequences
    all_seqs: Dict[str, int] = {}
    for sample_name, uniques in samples_dict.items():
        for seq in uniques:
            all_seqs[seq] = 0

    seq_list = list(all_seqs.keys())
    sample_names = list(samples_dict.keys())

    mat = np.zeros((len(sample_names), len(seq_list)), dtype=int)
    seq_index = {s: i for i, s in enumerate(seq_list)}

    for i, sample_name in enumerate(sample_names):
        for seq, ab in samples_dict[sample_name].items():
            mat[i, seq_index[seq]] = int(ab)

    # Order columns
    if order_by == "abundance":
        col_order = np.argsort(-mat.sum(axis=0))
    elif order_by == "nsamples":
        col_order = np.argsort(-(mat > 0).sum(axis=0))
    else:
        col_order = np.arange(len(seq_list))

    ordered_seqs = [seq_list[i] for i in col_order]
    ordered_mat = mat[:, col_order]

    logger.info("[INFO] make_sequence_table: %d samples x %d ASVs.",
                len(sample_names), len(ordered_seqs))

    return {
        "table": ordered_mat,
        "seqs": ordered_seqs,
        "sample_names": sample_names,
    }

assign_species

assign_species(seqs, ref_fasta, allow_multiple=False, try_rc=False)

Taxonomic assignment to species level by exact matching.

Each query sequence is searched as a substring against reference sequences. Reference FASTA headers must be in the format::

>SeqID Genus species
ACGAATGTGAAGTAA...

Parameters:

Name Type Description Default
seqs

Sequences to classify (any type accepted by get_sequences).

required
ref_fasta str

Path to reference FASTA file (may be gzipped).

required
allow_multiple Union[bool, int]

If False, only unambiguous (single) species matches are returned. If True, all matching species are returned (concatenated with /). If an integer, at most that many matching species are returned.

False
try_rc bool

If True, also search the reverse complement of each query.

False

Returns:

Type Description
ndarray

A numpy character array of shape (N, 2) with columns

ndarray

["Genus", "Species"]. Rows correspond to input sequences.

ndarray

None entries indicate no match.

Source code in papa2/utils.py
def assign_species(
    seqs,
    ref_fasta: str,
    allow_multiple: Union[bool, int] = False,
    try_rc: bool = False,
) -> np.ndarray:
    """Taxonomic assignment to species level by exact matching.

    Each query sequence is searched as a substring against reference sequences.
    Reference FASTA headers must be in the format::

        >SeqID Genus species
        ACGAATGTGAAGTAA...

    Args:
        seqs: Sequences to classify (any type accepted by ``get_sequences``).
        ref_fasta: Path to reference FASTA file (may be gzipped).
        allow_multiple: If False, only unambiguous (single) species matches
            are returned.  If True, all matching species are returned
            (concatenated with ``/``).  If an integer, at most that many
            matching species are returned.
        try_rc: If True, also search the reverse complement of each query.

    Returns:
        A numpy character array of shape ``(N, 2)`` with columns
        ``["Genus", "Species"]``.  Rows correspond to input sequences.
        ``None`` entries indicate no match.
    """
    seqs = get_sequences(seqs)
    logger.info("[INFO] assign_species: classifying %d sequences against %s",
                len(seqs), ref_fasta)

    if isinstance(allow_multiple, bool):
        keep = float("inf") if allow_multiple else 1
    else:
        keep = int(allow_multiple)

    # Parse reference
    ref_records = _parse_fasta(ref_fasta)
    ref_seqs: List[str] = []
    ref_genus: List[str] = []
    ref_species: List[str] = []
    for header, rseq in ref_records:
        parts = header.split()
        if len(parts) >= 3:
            ref_genus.append(parts[1])
            ref_species.append(parts[2])
        elif len(parts) == 2:
            ref_genus.append(parts[1])
            ref_species.append("")
        else:
            ref_genus.append("")
            ref_species.append("")
        ref_seqs.append(rseq)

    n = len(seqs)
    result = np.full((n, 2), None, dtype=object)

    # Build a kmer index over reference sequences for fast candidate lookup.
    # Instead of checking every query against every ref (O(n*m)), we only
    # verify substring containment for refs that share a kmer with the query.
    # Index every position so we do not miss true substring matches whose
    # shared kmer starts at a non-binned offset.
    _KMER_SIZE = 8
    kmer_index: Dict[str, List[int]] = {}
    for j, rseq in enumerate(ref_seqs):
        rseq_upper = rseq.upper()
        for pos in range(len(rseq_upper) - _KMER_SIZE + 1):
            kmer = rseq_upper[pos:pos + _KMER_SIZE]
            if kmer not in kmer_index:
                kmer_index[kmer] = []
            kmer_index[kmer].append(j)

    def _find_hits(query_up):
        """Find ref indices containing query_up as a substring, using kmer pre-filter."""
        if len(query_up) < _KMER_SIZE:
            return [j for j, ref in enumerate(ref_seqs) if query_up in ref.upper()]
        candidates: set = set()
        for pos in range(len(query_up) - _KMER_SIZE + 1):
            kmer = query_up[pos:pos + _KMER_SIZE]
            if kmer in kmer_index:
                candidates.update(kmer_index[kmer])
        # Verify actual substring containment on candidates only
        return [j for j in candidates if query_up in ref_seqs[j].upper()]

    for i, query in enumerate(seqs):
        query_up = query.upper()
        hit_indices = _find_hits(query_up)
        # Also check reverse complement
        if try_rc and not hit_indices:
            query_rc = _rc(query_up)
            hit_indices = _find_hits(query_rc)

        if not hit_indices:
            continue

        # Map hits to genus/species, collapsing Escherichia/Shigella
        genera = [ref_genus[j] for j in hit_indices]
        species = [ref_species[j] for j in hit_indices]

        # Genus: only unambiguous (single unique genus)
        genera_clean = list(genera)
        for k in range(len(genera_clean)):
            if "Escherichia" in genera_clean[k] or "Shigella" in genera_clean[k]:
                genera_clean[k] = "Escherichia/Shigella"
        unique_genera = sorted(set(genera_clean))
        if len(unique_genera) == 1:
            result[i, 0] = unique_genera[0]
        else:
            result[i, 0] = None

        # Species: apply keep threshold
        species_clean = list(species)
        for k in range(len(species_clean)):
            g = genera[k]
            if "Escherichia" in g or "Shigella" in g:
                species_clean[k] = species_clean[k]  # keep species as-is
        unique_species = sorted(set(s for s in species_clean if s))
        if len(unique_species) == 0:
            result[i, 1] = None
        elif len(unique_species) <= keep:
            result[i, 1] = "/".join(unique_species)
        else:
            result[i, 1] = None

    logger.info("[INFO] assign_species: %d of %d assigned to species level.",
                sum(result[:, 1] != None), n)  # noqa: E711
    return result

add_species

add_species(tax_table, ref_fasta, allow_multiple=False, try_rc=False)

Add species-level annotation to an existing taxonomy DataFrame.

Wraps assign_species and appends a "Species" column. Only species assignments whose genus is consistent with the genus already present in tax_table are kept.

Parameters:

Name Type Description Default
tax_table

A pandas DataFrame with sequences as the index and taxonomic levels as columns. Must include a "Genus" column (or the last column is used).

required
ref_fasta str

Path to species-level reference FASTA.

required
allow_multiple Union[bool, int]

Passed to assign_species.

False
try_rc bool

Passed to assign_species.

False

Returns:

Type Description

The input DataFrame with an added "Species" column.

Source code in papa2/utils.py
def add_species(
    tax_table,
    ref_fasta: str,
    allow_multiple: Union[bool, int] = False,
    try_rc: bool = False,
):
    """Add species-level annotation to an existing taxonomy DataFrame.

    Wraps ``assign_species`` and appends a ``"Species"`` column.  Only species
    assignments whose genus is consistent with the genus already present in
    ``tax_table`` are kept.

    Args:
        tax_table: A pandas DataFrame with sequences as the index and
            taxonomic levels as columns.  Must include a ``"Genus"`` column
            (or the last column is used).
        ref_fasta: Path to species-level reference FASTA.
        allow_multiple: Passed to ``assign_species``.
        try_rc: Passed to ``assign_species``.

    Returns:
        The input DataFrame with an added ``"Species"`` column.
    """
    import pandas as pd

    seqs = list(tax_table.index)
    binom = assign_species(seqs, ref_fasta,
                           allow_multiple=allow_multiple, try_rc=try_rc)

    # Determine genus column
    if "Genus" in tax_table.columns:
        gcol = "Genus"
    else:
        gcol = tax_table.columns[-1]

    species_col = []
    for i, seq in enumerate(seqs):
        tax_genus = tax_table.loc[seq, gcol] if seq in tax_table.index else None
        binom_genus = binom[i, 0]
        binom_species = binom[i, 1]

        if _match_genera(tax_genus, binom_genus):
            species_col.append(binom_species)
        else:
            species_col.append(None)

    tax_table = tax_table.copy()
    tax_table["Species"] = species_col
    return tax_table

collapse_no_mismatch

collapse_no_mismatch(seqtab)

Merge ASVs whose sequences are identical except for length differences.

If sequence A is a substring of sequence B (allowing only leading/trailing gaps, i.e. one is a prefix or suffix or internal subsequence with no mismatches), their abundances are merged under the longer sequence.

Sequences are processed in decreasing order of total abundance. For each query, if it is found as a substring of an already-accepted sequence (or vice versa), it is collapsed into that representative.

Parameters:

Name Type Description Default
seqtab

Either a dict mapping {sequence: abundance} or a pandas DataFrame (samples x sequences, column names are seqs).

required

Returns:

Type Description
dict

If input was a dict, returns a collapsed {sequence: abundance}

dict

dict. If input was a DataFrame, returns a collapsed DataFrame.

Source code in papa2/utils.py
def collapse_no_mismatch(seqtab) -> dict:
    """Merge ASVs whose sequences are identical except for length differences.

    If sequence A is a substring of sequence B (allowing only leading/trailing
    gaps, i.e. one is a prefix or suffix or internal subsequence with no
    mismatches), their abundances are merged under the longer sequence.

    Sequences are processed in decreasing order of total abundance.  For each
    query, if it is found as a substring of an already-accepted sequence (or
    vice versa), it is collapsed into that representative.

    Args:
        seqtab: Either a ``dict`` mapping ``{sequence: abundance}`` or a
            pandas DataFrame (samples x sequences, column names are seqs).

    Returns:
        If input was a dict, returns a collapsed ``{sequence: abundance}``
        dict.  If input was a DataFrame, returns a collapsed DataFrame.
    """
    import pandas as pd

    is_df = isinstance(seqtab, pd.DataFrame)
    is_table_dict = (isinstance(seqtab, dict) and "table" in seqtab and "seqs" in seqtab)

    if is_df:
        # Column names are sequences; collapse column-wise
        seq_abundances = seqtab.sum(axis=0).to_dict()
    elif is_table_dict:
        tbl = seqtab["table"]
        seqs = seqtab["seqs"]
        seq_abundances = {s: int(tbl[:, i].sum()) for i, s in enumerate(seqs)}
    else:
        seq_abundances = dict(seqtab)

    logger.info("[INFO] collapse_no_mismatch: %d input sequences.",
                len(seq_abundances))

    # Sort by decreasing total abundance
    sorted_seqs = sorted(seq_abundances.keys(),
                         key=lambda s: seq_abundances[s], reverse=True)

    # Build mapping: query seq -> representative seq
    representatives: List[str] = []
    merge_map: Dict[str, str] = {}  # query -> representative

    for query in sorted_seqs:
        added = False
        q_upper = query.upper()
        for idx, ref in enumerate(representatives):
            r_upper = ref.upper()
            if q_upper in r_upper:
                # query is a subsequence of ref — merge under ref (longer)
                merge_map[query] = ref
                added = True
                break
            elif r_upper in q_upper:
                # ref is a subsequence of query — query is longer, so it
                # becomes the new representative and ref merges into it
                merge_map[query] = query
                # Re-point everything that was merged into ref → query
                for k, v in merge_map.items():
                    if v == ref:
                        merge_map[k] = query
                representatives[idx] = query
                added = True
                break
        if not added:
            representatives.append(query)
            merge_map[query] = query

    if is_df:
        # Build collapsed DataFrame
        collapsed = pd.DataFrame(0, index=seqtab.index,
                                 columns=representatives)
        for seq in seqtab.columns:
            rep = merge_map[seq]
            collapsed[rep] = collapsed[rep] + seqtab[seq].values
        col_order = collapsed.sum(axis=0).sort_values(ascending=False).index
        collapsed = collapsed[col_order]
        logger.info("[INFO] collapse_no_mismatch: %d output sequences.",
                    len(collapsed.columns))
        return collapsed
    elif is_table_dict:
        tbl = seqtab["table"]
        seqs = seqtab["seqs"]
        rep_idx = {r: i for i, r in enumerate(representatives)}
        new_mat = np.zeros((tbl.shape[0], len(representatives)), dtype=tbl.dtype)
        for j, seq in enumerate(seqs):
            rep = merge_map[seq]
            new_mat[:, rep_idx[rep]] += tbl[:, j]
        col_order = np.argsort(-new_mat.sum(axis=0))
        logger.info("[INFO] collapse_no_mismatch: %d output sequences.",
                    len(representatives))
        return {
            "table": new_mat[:, col_order],
            "seqs": [representatives[i] for i in col_order],
            "sample_names": seqtab.get("sample_names", []),
        }
    else:
        out: Dict[str, int] = {}
        for seq, ab in seq_abundances.items():
            rep = merge_map[seq]
            out[rep] = out.get(rep, 0) + int(ab)
        # Sort by decreasing abundance
        out = dict(sorted(out.items(), key=lambda x: x[1], reverse=True))
        logger.info("[INFO] collapse_no_mismatch: %d output sequences.",
                    len(out))
        return out

plot_quality_profile

plot_quality_profile(fastq_files, n=500000, output_pdf=None)

Plot mean quality score per cycle position from FASTQ file(s).

Reads up to n records from each file, computes per-position quality statistics, and produces a matplotlib figure with mean (green), median (orange), and 25th/75th percentile (dashed orange) quality lines, similar to the R plotQualityProfile.

Parameters:

Name Type Description Default
fastq_files Union[str, List[str]]

One or more FASTQ file paths (may be gzipped).

required
n int

Maximum number of reads to sample per file.

500000
output_pdf Optional[str]

If provided, save the figure to this PDF path.

None

Returns:

Type Description

A numpy array of quality scores with shape (n_reads, max_len)

(values are NaN where reads are shorter than max_len).

Source code in papa2/utils.py
def plot_quality_profile(
    fastq_files: Union[str, List[str]],
    n: int = 500000,
    output_pdf: Optional[str] = None,
):
    """Plot mean quality score per cycle position from FASTQ file(s).

    Reads up to *n* records from each file, computes per-position quality
    statistics, and produces a matplotlib figure with mean (green), median
    (orange), and 25th/75th percentile (dashed orange) quality lines,
    similar to the R ``plotQualityProfile``.

    Args:
        fastq_files: One or more FASTQ file paths (may be gzipped).
        n: Maximum number of reads to sample per file.
        output_pdf: If provided, save the figure to this PDF path.

    Returns:
        A numpy array of quality scores with shape ``(n_reads, max_len)``
        (values are NaN where reads are shorter than max_len).
    """
    import matplotlib
    matplotlib.use("Agg")
    import matplotlib.pyplot as plt

    if isinstance(fastq_files, str):
        fastq_files = [fastq_files]

    logger.info("[INFO] plot_quality_profile: processing %d file(s), n=%d.",
                len(fastq_files), n)

    all_quals: List[List[List[int]]] = []  # per-file list of per-read quals
    file_labels: List[str] = []

    for fpath in fastq_files:
        quals_per_read: List[List[int]] = []
        opener = gzip.open if fpath.endswith(".gz") else open
        count = 0
        with opener(fpath, "rt") as fh:
            while count < n:
                header = fh.readline()
                if not header:
                    break
                seq = fh.readline()
                plus = fh.readline()
                qual_line = fh.readline().rstrip("\n\r")
                if not qual_line:
                    break
                quals = [ord(c) - 33 for c in qual_line]
                quals_per_read.append(quals)
                count += 1
        all_quals.append(quals_per_read)
        file_labels.append(os.path.basename(fpath))
        logger.info("[INFO] plot_quality_profile: read %d records from %s.",
                    count, os.path.basename(fpath))

    # Build quality matrix (all files combined for return value)
    max_len = max(
        max((len(q) for q in qlist), default=0)
        for qlist in all_quals
    ) if all_quals else 0
    total_reads = sum(len(qlist) for qlist in all_quals)
    qual_matrix = np.full((total_reads, max_len), np.nan)
    row = 0
    for qlist in all_quals:
        for q in qlist:
            qual_matrix[row, :len(q)] = q
            row += 1

    # Plot
    n_files = len(fastq_files)
    fig, axes = plt.subplots(1, n_files, figsize=(6 * n_files, 4),
                             squeeze=False)

    row_offset = 0
    for fi, qlist in enumerate(all_quals):
        ax = axes[0, fi]
        n_reads = len(qlist)
        if n_reads == 0:
            ax.set_title(file_labels[fi])
            continue

        local_max = max(len(q) for q in qlist)
        local_mat = np.full((n_reads, local_max), np.nan)
        for ri, q in enumerate(qlist):
            local_mat[ri, :len(q)] = q

        positions = np.arange(1, local_max + 1)
        means = np.nanmean(local_mat, axis=0)
        q25 = np.nanpercentile(local_mat, 25, axis=0)
        q50 = np.nanpercentile(local_mat, 50, axis=0)
        q75 = np.nanpercentile(local_mat, 75, axis=0)

        ax.plot(positions, means, color="#66C2A5", linewidth=1.0,
                label="Mean")
        ax.plot(positions, q50, color="#FC8D62", linewidth=0.8,
                label="Median")
        ax.plot(positions, q25, color="#FC8D62", linewidth=0.5,
                linestyle="--", label="Q25")
        ax.plot(positions, q75, color="#FC8D62", linewidth=0.5,
                linestyle="--", label="Q75")
        ax.set_xlabel("Cycle")
        ax.set_ylabel("Quality Score")
        ax.set_title(file_labels[fi])

        read_label = (f"Reads >= {n}" if n_reads >= n
                      else f"Reads: {n_reads}")
        ax.text(0.02, 0.02, read_label, transform=ax.transAxes,
                color="red", fontsize=8, verticalalignment="bottom")
        ax.set_ylim(bottom=0)

        row_offset += n_reads

    plt.tight_layout()
    if output_pdf:
        fig.savefig(output_pdf, format="pdf")
        logger.info("[INFO] plot_quality_profile: saved to %s.", output_pdf)
    plt.close(fig)

    return qual_matrix

uniquesto_fasta

uniquesto_fasta(uniques, fasta_path, ids=None)

Write a uniques dict or list of sequences to a FASTA file.

If uniques is a dict {seq: abundance}, headers are formatted as >sq1;size=1234; (uchime-compatible) unless custom ids are given.

Parameters:

Name Type Description Default
uniques

A dict {sequence: abundance} or any object accepted by get_uniques.

required
fasta_path str

Output FASTA file path.

required
ids Optional[List[str]]

Optional custom sequence identifiers.

None
Source code in papa2/utils.py
def uniquesto_fasta(
    uniques,
    fasta_path: str,
    ids: Optional[List[str]] = None,
) -> None:
    """Write a uniques dict or list of sequences to a FASTA file.

    If *uniques* is a dict ``{seq: abundance}``, headers are formatted as
    ``>sq1;size=1234;`` (uchime-compatible) unless custom *ids* are given.

    Args:
        uniques: A dict ``{sequence: abundance}`` or any object accepted by
            ``get_uniques``.
        fasta_path: Output FASTA file path.
        ids: Optional custom sequence identifiers.
    """
    uniqs = get_uniques(uniques)
    seqs = list(uniqs.keys())
    abunds = list(uniqs.values())

    if ids is not None and len(ids) != len(seqs):
        raise ValueError(
            f"Length of ids ({len(ids)}) != number of sequences ({len(seqs)})."
        )

    with open(fasta_path, "w") as fh:
        for i, (seq, ab) in enumerate(zip(seqs, abunds)):
            if ids is not None:
                header = ids[i]
            else:
                header = f"sq{i + 1};size={ab};"
            fh.write(f">{header}\n{seq}\n")

write_fasta

write_fasta(seqs, fasta_path, ids=None)

Write a list of sequences to a FASTA file.

Parameters:

Name Type Description Default
seqs Sequence[str]

Iterable of DNA sequence strings.

required
fasta_path str

Output file path.

required
ids Optional[List[str]]

Optional list of identifiers (one per sequence). Defaults to 1, 2, 3, ....

None
Source code in papa2/utils.py
def write_fasta(
    seqs: Sequence[str],
    fasta_path: str,
    ids: Optional[List[str]] = None,
) -> None:
    """Write a list of sequences to a FASTA file.

    Args:
        seqs: Iterable of DNA sequence strings.
        fasta_path: Output file path.
        ids: Optional list of identifiers (one per sequence).
            Defaults to ``1, 2, 3, ...``.
    """
    seqs = list(seqs)
    if ids is not None and len(ids) != len(seqs):
        raise ValueError(
            f"Length of ids ({len(ids)}) != number of sequences ({len(seqs)})."
        )
    with open(fasta_path, "w") as fh:
        for i, seq in enumerate(seqs):
            header = ids[i] if ids is not None else str(i + 1)
            fh.write(f">{header}\n{seq}\n")

is_phix

is_phix(seqs, ref_path=None, word_size=16, min_matches=2)

Check sequences against the PhiX genome using kmer matching.

For each query sequence, kmers of size word_size are extracted and compared to the PhiX reference (forward and reverse complement). A sequence is flagged as PhiX if at least min_matches kmers hit.

Parameters:

Name Type Description Default
seqs

Sequences to check (any type accepted by get_sequences).

required
ref_path Optional[str]

Path to a FASTA file containing the PhiX genome. If None, a built-in 100 bp snippet is used for matching.

None
word_size int

Kmer size for matching.

16
min_matches int

Minimum number of kmer hits to call PhiX.

2

Returns:

Type Description
ndarray

A boolean numpy array, True where a sequence matches PhiX.

Source code in papa2/utils.py
def is_phix(
    seqs,
    ref_path: Optional[str] = None,
    word_size: int = 16,
    min_matches: int = 2,
) -> np.ndarray:
    """Check sequences against the PhiX genome using kmer matching.

    For each query sequence, kmers of size ``word_size`` are extracted and
    compared to the PhiX reference (forward and reverse complement).  A
    sequence is flagged as PhiX if at least ``min_matches`` kmers hit.

    Args:
        seqs: Sequences to check (any type accepted by ``get_sequences``).
        ref_path: Path to a FASTA file containing the PhiX genome.  If None,
            a built-in 100 bp snippet is used for matching.
        word_size: Kmer size for matching.
        min_matches: Minimum number of kmer hits to call PhiX.

    Returns:
        A boolean numpy array, True where a sequence matches PhiX.
    """
    seqs = get_sequences(seqs)

    if ref_path is not None:
        recs = _parse_fasta(ref_path)
        phix_seq = "".join(seq for _, seq in recs).upper()
    else:
        phix_seq = _PHIX_100BP.upper()

    phix_rc = _rc(phix_seq)

    # Build kmer sets for forward and RC PhiX
    def _kmer_set(s, k):
        return set(s[i:i+k] for i in range(len(s) - k + 1))

    phix_kmers = _kmer_set(phix_seq, word_size)
    phix_rc_kmers = _kmer_set(phix_rc, word_size)
    all_phix_kmers = phix_kmers | phix_rc_kmers

    result = np.zeros(len(seqs), dtype=bool)
    for i, seq in enumerate(seqs):
        seq = seq.upper()
        hits = 0
        for j in range(len(seq) - word_size + 1):
            kmer = seq[j:j + word_size]
            if kmer in all_phix_kmers:
                hits += 1
                if hits >= min_matches:
                    break
        result[i] = hits >= min_matches

    return result

seq_complexity

seq_complexity(seqs, kmer_size=2, window=None, by=5)

Calculate sequence complexity as Shannon effective number of kmers.

Complexity is the exponential of the Shannon entropy of kmer frequencies. A perfectly random sequence of sufficient length will approach 4**kmer_size. Repetitive / low-complexity sequences will have values well below this maximum.

If a window is provided, the minimum complexity observed over a sliding window along each sequence is returned.

Parameters:

Name Type Description Default
seqs

Sequences (any type accepted by get_sequences).

required
kmer_size int

Size of kmers to count. Default 2 (dinucleotides).

2
window Optional[int]

Width of sliding window in nucleotides. If None, the whole sequence is used.

None
by int

Step size for the sliding window.

5

Returns:

Type Description
ndarray

A numpy array of complexity values, one per input sequence.

Source code in papa2/utils.py
def seq_complexity(
    seqs,
    kmer_size: int = 2,
    window: Optional[int] = None,
    by: int = 5,
) -> np.ndarray:
    """Calculate sequence complexity as Shannon effective number of kmers.

    Complexity is the exponential of the Shannon entropy of kmer frequencies.
    A perfectly random sequence of sufficient length will approach
    ``4**kmer_size``.  Repetitive / low-complexity sequences will have values
    well below this maximum.

    If a *window* is provided, the minimum complexity observed over a sliding
    window along each sequence is returned.

    Args:
        seqs: Sequences (any type accepted by ``get_sequences``).
        kmer_size: Size of kmers to count.  Default 2 (dinucleotides).
        window: Width of sliding window in nucleotides.  If None, the
            whole sequence is used.
        by: Step size for the sliding window.

    Returns:
        A numpy array of complexity values, one per input sequence.
    """
    seqs = get_sequences(seqs)
    si_max = 4 ** kmer_size

    if window is not None and kmer_size >= window:
        raise ValueError("window must be larger than kmer_size.")

    def _kmer_complexity(s: str) -> float:
        """Shannon effective kmer number for a single string."""
        k = kmer_size
        if len(s) < k:
            return 0.0
        counts: Dict[str, int] = {}
        for i in range(len(s) - k + 1):
            kmer = s[i:i+k]
            # Ignore kmers with non-ACGT
            if all(c in "ACGT" for c in kmer):
                counts[kmer] = counts.get(kmer, 0) + 1
        if not counts:
            return 0.0
        return _sindex(list(counts.values()))

    result = np.zeros(len(seqs), dtype=float)
    for i, seq in enumerate(seqs):
        seq = seq.upper()
        if window is None:
            result[i] = _kmer_complexity(seq)
        else:
            min_si = si_max
            seq_len = len(seq)
            if seq_len < window:
                result[i] = _kmer_complexity(seq)
                continue
            for start in range(0, seq_len - window + 1, by):
                subseq = seq[start:start + window]
                si = _kmer_complexity(subseq)
                if si < min_si:
                    min_si = si
            result[i] = min_si

    return result

get_sequences

get_sequences(obj, collapse=False)

Extract a list of sequences from various dada2 object types.

Supported inputs
  • list / tuple of strings (returned as-is, upper-cased)
  • dict {seq: abundance} (keys returned)
  • pandas DataFrame with a "sequence" column
  • pandas DataFrame where column names are sequences (sequence table)
  • numpy character matrix with row names (taxonomy table -- not common in Python, included for completeness)
  • a single file path to a FASTA/FASTQ file

Parameters:

Name Type Description Default
obj

Input object.

required
collapse bool

If True, remove duplicate sequences.

False

Returns:

Type Description
List[str]

List of upper-case DNA sequence strings.

Source code in papa2/utils.py
def get_sequences(obj, collapse: bool = False) -> List[str]:
    """Extract a list of sequences from various dada2 object types.

    Supported inputs:
        - list / tuple of strings (returned as-is, upper-cased)
        - dict ``{seq: abundance}``  (keys returned)
        - pandas DataFrame with a ``"sequence"`` column
        - pandas DataFrame where column names are sequences (sequence table)
        - numpy character matrix with row names (taxonomy table -- not common
          in Python, included for completeness)
        - a single file path to a FASTA/FASTQ file

    Args:
        obj: Input object.
        collapse: If True, remove duplicate sequences.

    Returns:
        List of upper-case DNA sequence strings.
    """
    # String -- could be a file path or a single sequence
    if isinstance(obj, str):
        if os.path.isfile(obj):
            recs = _parse_fasta(obj)
            return [seq.upper() for _, seq in recs]
        return [obj.upper()]

    # Dict {seq: abundance}
    if isinstance(obj, dict):
        seqs = [str(k).upper() for k in obj.keys()]
        if collapse:
            seqs = list(dict.fromkeys(seqs))
        return seqs

    # Try pandas objects
    try:
        import pandas as pd
        if isinstance(obj, pd.DataFrame):
            if "sequence" in obj.columns:
                seqs = obj["sequence"].astype(str).str.upper().tolist()
            else:
                # Sequence table: column names are sequences
                seqs = [str(c).upper() for c in obj.columns]
            if collapse:
                seqs = list(dict.fromkeys(seqs))
            return seqs
        if isinstance(obj, pd.Series):
            return [str(v).upper() for v in obj.values]
    except ImportError:
        pass

    # Iterable of strings
    if hasattr(obj, "__iter__"):
        seqs = [str(s).upper() for s in obj]
        if collapse:
            seqs = list(dict.fromkeys(seqs))
        return seqs

    raise TypeError(
        "Cannot extract sequences from object of type "
        f"{type(obj).__name__}. Provide a list of strings, a dict "
        "{{seq: abundance}}, a pandas DataFrame, or a FASTA file path."
    )

get_uniques

get_uniques(obj, collapse=True)

Extract a {sequence: abundance} dictionary from various types.

Supported inputs
  • dict {seq: abundance} -- returned directly (optionally collapsed)
  • list / tuple of strings -- each occurrence counted
  • pandas DataFrame with "sequence" and "abundance" columns
  • pandas DataFrame where columns are sequences (sequence table) -- column sums used as abundances
  • a single FASTA file path (each record gets abundance 1)

Parameters:

Name Type Description Default
obj

Input object.

required
collapse bool

If True, merge duplicate sequences by summing abundances.

True

Returns:

Type Description
Dict[str, int]

Dictionary mapping upper-case sequence strings to integer abundances.

Source code in papa2/utils.py
def get_uniques(obj, collapse: bool = True) -> Dict[str, int]:
    """Extract a ``{sequence: abundance}`` dictionary from various types.

    Supported inputs:
        - dict ``{seq: abundance}`` -- returned directly (optionally collapsed)
        - list / tuple of strings -- each occurrence counted
        - pandas DataFrame with ``"sequence"`` and ``"abundance"`` columns
        - pandas DataFrame where columns are sequences (sequence table) --
          column sums used as abundances
        - a single FASTA file path (each record gets abundance 1)

    Args:
        obj: Input object.
        collapse: If True, merge duplicate sequences by summing abundances.

    Returns:
        Dictionary mapping upper-case sequence strings to integer abundances.
    """
    # Already a dict
    if isinstance(obj, dict):
        out: Dict[str, int] = {}
        for k, v in obj.items():
            k_up = str(k).upper()
            out[k_up] = out.get(k_up, 0) + int(v) if collapse else int(v)
        return out

    # String file path
    if isinstance(obj, str) and os.path.isfile(obj):
        recs = _parse_fasta(obj)
        out = {}
        for _, seq in recs:
            seq = seq.upper()
            out[seq] = out.get(seq, 0) + 1
        return out

    # Try pandas
    try:
        import pandas as pd
        if isinstance(obj, pd.DataFrame):
            if "sequence" in obj.columns and "abundance" in obj.columns:
                out = {}
                for seq, ab in zip(obj["sequence"], obj["abundance"]):
                    seq = str(seq).upper()
                    ab = int(ab)
                    if collapse:
                        out[seq] = out.get(seq, 0) + ab
                    else:
                        out[seq] = ab
                return out
            else:
                # Sequence table: column names are seqs, values are counts
                sums = obj.sum(axis=0)
                return {str(seq).upper(): int(ab) for seq, ab in sums.items()}
    except ImportError:
        pass

    # Iterable of strings -- count occurrences
    if hasattr(obj, "__iter__"):
        out = {}
        for s in obj:
            s = str(s).upper()
            out[s] = out.get(s, 0) + 1
        return out

    raise TypeError(
        "Cannot extract uniques from object of type "
        f"{type(obj).__name__}."
    )

get_errors

get_errors(obj, detailed=False, enforce=True)

Extract error rate information from various dada2 object types.

Mirrors R's getErrors.

Supported inputs
  • numpy array: used directly as err_out.
  • dada result dict: extracts err_out, err_in, trans.
  • list of dada result dicts: verifies all share the same err_out, accumulates trans across samples.

Parameters:

Name Type Description Default
obj

Input object containing error information.

required
detailed bool

If True, return a dict with err_out, err_in, and trans keys. If False (default), return just the err_out matrix.

False
enforce bool

If True, validate that the error matrix has 16 rows, is numeric, and all values are in [0, 1].

True

Returns:

Type Description

numpy array (16, ncol) if detailed=False, or a dict with

err_out, err_in, trans keys if detailed=True.

Source code in papa2/utils.py
def get_errors(
    obj,
    detailed: bool = False,
    enforce: bool = True,
):
    """Extract error rate information from various dada2 object types.

    Mirrors R's ``getErrors``.

    Supported inputs:
        - numpy array: used directly as ``err_out``.
        - dada result dict: extracts ``err_out``, ``err_in``, ``trans``.
        - list of dada result dicts: verifies all share the same ``err_out``,
          accumulates ``trans`` across samples.

    Args:
        obj: Input object containing error information.
        detailed: If True, return a dict with ``err_out``, ``err_in``, and
            ``trans`` keys.  If False (default), return just the ``err_out``
            matrix.
        enforce: If True, validate that the error matrix has 16 rows,
            is numeric, and all values are in [0, 1].

    Returns:
        numpy array (16, ncol) if ``detailed=False``, or a dict with
        ``err_out``, ``err_in``, ``trans`` keys if ``detailed=True``.
    """
    err_out = None
    err_in = None
    trans = None

    if isinstance(obj, np.ndarray):
        err_out = obj

    elif isinstance(obj, dict):
        err_out = obj.get("err_out", None)
        err_in = obj.get("err_in", None)
        trans = obj.get("trans", None)
        # If the dict doesn't have err_out but has an error matrix shape
        if err_out is None and "err" in obj:
            err_out = obj["err"]

    elif isinstance(obj, list):
        # List of dada result dicts
        if len(obj) == 0:
            raise ValueError("Empty list provided to get_errors.")

        # Use the first element's err_out as reference
        first = obj[0]
        if isinstance(first, dict):
            err_out = first.get("err_out", None)
            err_in = first.get("err_in", None)
            trans = first.get("trans", None)
            if trans is not None:
                trans = trans.copy().astype(np.float64)

            for item in obj[1:]:
                if not isinstance(item, dict):
                    raise TypeError(
                        f"Expected dict in list, got {type(item).__name__}."
                    )
                item_err = item.get("err_out", None)
                if item_err is not None and err_out is not None:
                    if not np.array_equal(item_err, err_out):
                        raise ValueError(
                            "Not all dada results share the same err_out."
                        )
                item_trans = item.get("trans", None)
                if item_trans is not None:
                    if trans is None:
                        trans = item_trans.copy().astype(np.float64)
                    else:
                        trans = trans + item_trans.astype(np.float64)
        else:
            raise TypeError(
                f"Expected list of dicts, got list of {type(first).__name__}."
            )
    else:
        raise TypeError(
            f"Cannot extract errors from object of type "
            f"{type(obj).__name__}."
        )

    if err_out is None:
        raise ValueError("Could not extract error matrix from input.")

    err_out = np.asarray(err_out, dtype=np.float64)

    if enforce:
        if err_out.ndim != 2 or err_out.shape[0] != 16:
            raise ValueError(
                f"Error matrix must have 16 rows, got shape {err_out.shape}."
            )
        if not np.issubdtype(err_out.dtype, np.number):
            raise ValueError("Error matrix must be numeric.")
        if np.any(err_out < 0.0) or np.any(err_out > 1.0):
            raise ValueError(
                "Error matrix values must be in [0, 1]."
            )

    if detailed:
        return {
            "err_out": err_out,
            "err_in": np.asarray(err_in, dtype=np.float64) if err_in is not None else None,
            "trans": np.asarray(trans, dtype=np.float64) if trans is not None else None,
        }
    return err_out

merge_sequence_tables

merge_sequence_tables(*tables, order_by='abundance', try_rc=False)

Merge multiple sequence tables into one.

Mirrors R's mergeSequenceTables.

Each table is a dict with
  • 'table': numpy array (nsamples x nseqs)
  • 'seqs': list of sequence strings

Sequences present in multiple tables have their counts summed. If try_rc is True, reverse-complement sequences are detected and re-oriented to match the majority orientation.

Parameters:

Name Type Description Default
*tables

Variable number of sequence table dicts.

()
order_by str

Column ordering: "abundance" (total counts, descending) or "nsamples" (number of samples present, descending).

'abundance'
try_rc bool

If True, check for reverse-complement sequences across tables and re-orient them before merging.

False

Returns:

Type Description

A dict with 'table' (numpy array) and 'seqs' (list of str).

Source code in papa2/utils.py
def merge_sequence_tables(
    *tables,
    order_by: str = "abundance",
    try_rc: bool = False,
):
    """Merge multiple sequence tables into one.

    Mirrors R's ``mergeSequenceTables``.

    Each table is a dict with:
        - ``'table'``: numpy array (nsamples x nseqs)
        - ``'seqs'``: list of sequence strings

    Sequences present in multiple tables have their counts summed.
    If ``try_rc`` is True, reverse-complement sequences are detected
    and re-oriented to match the majority orientation.

    Args:
        *tables: Variable number of sequence table dicts.
        order_by: Column ordering: ``"abundance"`` (total counts, descending)
            or ``"nsamples"`` (number of samples present, descending).
        try_rc: If True, check for reverse-complement sequences across
            tables and re-orient them before merging.

    Returns:
        A dict with ``'table'`` (numpy array) and ``'seqs'`` (list of str).
    """
    if len(tables) == 0:
        raise ValueError("No tables provided to merge.")

    if len(tables) == 1:
        return tables[0]

    # Determine canonical orientation for each sequence before building the
    # merged matrix so try_rc is independent of input order.
    all_seqs_ordered: List[str] = []
    seen_seqs: set[str] = set()
    seq_totals: Dict[str, int] = {}
    first_seen: Dict[str, int] = {}
    sample_names: List[str] = []

    for tbl_idx, tbl in enumerate(tables):
        tseqs = tbl["seqs"]
        tmat = np.asarray(tbl["table"])
        if tmat.ndim == 1:
            tmat = tmat.reshape(1, -1)
        totals = tmat.sum(axis=0)
        for seq, total in zip(tseqs, totals):
            seq_totals[seq] = seq_totals.get(seq, 0) + int(total)
            if seq not in seen_seqs:
                seen_seqs.add(seq)
                first_seen[seq] = len(all_seqs_ordered)
                all_seqs_ordered.append(seq)

        tbl_sample_names = tbl.get("sample_names")
        if tbl_sample_names:
            sample_names.extend(list(tbl_sample_names))
        else:
            for sample_idx in range(tmat.shape[0]):
                sample_names.append(f"table{tbl_idx + 1}_sample{sample_idx + 1}")

    canonical_seq: Dict[str, str] = {}
    all_seqs_set: Dict[str, int] = {}
    canonical_order: List[str] = []
    for seq in all_seqs_ordered:
        if seq in canonical_seq:
            continue
        canon = seq
        if try_rc:
            seq_rc = _rc(seq)
            if seq_rc in seq_totals and seq_rc != seq:
                seq_total = seq_totals.get(seq, 0)
                rc_total = seq_totals.get(seq_rc, 0)
                if rc_total > seq_total:
                    canon = seq_rc
                elif rc_total == seq_total and first_seen.get(seq_rc, float("inf")) < first_seen.get(seq, float("inf")):
                    canon = seq_rc
                canonical_seq[seq_rc] = canon
        canonical_seq[seq] = canon
        if canon not in all_seqs_set:
            all_seqs_set[canon] = len(canonical_order)
            canonical_order.append(canon)

    n_seqs = len(canonical_order)
    seq_index = {s: i for i, s in enumerate(canonical_order)}

    # Second pass: build merged count matrix
    all_sample_counts: List[np.ndarray] = []

    for tbl in tables:
        tseqs = tbl["seqs"]
        tmat = np.asarray(tbl["table"])
        if tmat.ndim == 1:
            tmat = tmat.reshape(1, -1)
        nsamples = tmat.shape[0]

        for si in range(nsamples):
            row = np.zeros(n_seqs, dtype=np.int64)
            for ji, seq in enumerate(tseqs):
                canon = canonical_seq.get(seq, seq)
                idx = seq_index[canon]
                row[idx] += tmat[si, ji]
            all_sample_counts.append(row)

    merged_mat = np.vstack(all_sample_counts)

    # Order columns
    if order_by == "abundance":
        col_totals = merged_mat.sum(axis=0)
        order = np.argsort(-col_totals)
    elif order_by == "nsamples":
        n_present = (merged_mat > 0).sum(axis=0)
        order = np.argsort(-n_present)
    else:
        order = np.arange(n_seqs)

    merged_mat = merged_mat[:, order]
    merged_seqs = [canonical_order[i] for i in order]

    return {
        "table": merged_mat,
        "seqs": merged_seqs,
        "sample_names": sample_names,
    }

nwhamming

nwhamming(s1, s2, **kwargs)

Compute Hamming distance between sequences via NW alignment.

Aligns the two sequences using nwalign and then counts mismatches and indels using eval_pair.

Vectorized: if s1 and s2 are both lists (of the same length), returns a list of distances.

Parameters:

Name Type Description Default
s1

DNA sequence string, or list of strings.

required
s2

DNA sequence string, or list of strings.

required
**kwargs

Additional keyword arguments passed to nwalign (e.g. match, mismatch, gap_p, band).

{}

Returns:

Name Type Description
int scalar inputs) or list of int (list inputs

mismatch + indel

count for each pair.

Source code in papa2/utils.py
def nwhamming(s1, s2, **kwargs):
    """Compute Hamming distance between sequences via NW alignment.

    Aligns the two sequences using ``nwalign`` and then counts mismatches
    and indels using ``eval_pair``.

    Vectorized: if *s1* and *s2* are both lists (of the same length),
    returns a list of distances.

    Args:
        s1: DNA sequence string, or list of strings.
        s2: DNA sequence string, or list of strings.
        **kwargs: Additional keyword arguments passed to ``nwalign``
            (e.g. ``match``, ``mismatch``, ``gap_p``, ``band``).

    Returns:
        int (scalar inputs) or list of int (list inputs): mismatch + indel
        count for each pair.
    """
    from ._cdada import nwalign as _nwalign, eval_pair as _eval_pair

    is_list = isinstance(s1, (list, tuple))
    if is_list:
        if not isinstance(s2, (list, tuple)) or len(s1) != len(s2):
            raise ValueError(
                "s1 and s2 must be lists of the same length for vectorized nwhamming."
            )
        results = []
        for a, b in zip(s1, s2):
            al1, al2 = _nwalign(a, b, **kwargs)
            nmatch, nmismatch, nindel = _eval_pair(al1, al2)
            results.append(nmismatch + nindel)
        return results

    al1, al2 = _nwalign(s1, s2, **kwargs)
    nmatch, nmismatch, nindel = _eval_pair(al1, al2)
    return nmismatch + nindel

is_shift_denovo

is_shift_denovo(unqs, min_overlap=20, verbose=False)

Check if sequences are shifted versions of more-abundant sequences.

For each sequence (sorted by decreasing abundance), check whether it is a "shift" of any more-abundant sequence. A shifted pair has:

  • match < len(sq1) AND match < len(sq2)
  • match >= min_overlap
  • mismatch == 0 AND indel == 0

This identifies reads that are identical subsequences but offset (shifted) relative to each other.

Parameters:

Name Type Description Default
unqs

dict {sequence: abundance} (as from get_uniques), or a list of sequences sorted by decreasing abundance.

required
min_overlap int

Minimum overlap (match) length to call a shift.

20
verbose bool

If True, log details about detected shifts.

False

Returns:

Type Description
ndarray

Boolean numpy array, True where a sequence is a shifted duplicate

ndarray

of a more-abundant sequence.

Source code in papa2/utils.py
def is_shift_denovo(
    unqs,
    min_overlap: int = 20,
    verbose: bool = False,
) -> np.ndarray:
    """Check if sequences are shifted versions of more-abundant sequences.

    For each sequence (sorted by decreasing abundance), check whether it
    is a "shift" of any more-abundant sequence.  A shifted pair has:

    - ``match < len(sq1)`` AND ``match < len(sq2)``
    - ``match >= min_overlap``
    - ``mismatch == 0`` AND ``indel == 0``

    This identifies reads that are identical subsequences but offset
    (shifted) relative to each other.

    Args:
        unqs: dict ``{sequence: abundance}`` (as from ``get_uniques``),
            or a list of sequences sorted by decreasing abundance.
        min_overlap: Minimum overlap (match) length to call a shift.
        verbose: If True, log details about detected shifts.

    Returns:
        Boolean numpy array, True where a sequence is a shifted duplicate
        of a more-abundant sequence.
    """
    from ._cdada import nwalign as _nwalign, eval_pair as _eval_pair

    if isinstance(unqs, dict):
        # Sort by decreasing abundance
        sorted_items = sorted(unqs.items(), key=lambda x: x[1], reverse=True)
        seqs = [item[0] for item in sorted_items]
    elif isinstance(unqs, (list, tuple)):
        seqs = list(unqs)
    else:
        seqs = list(unqs)

    n = len(seqs)
    is_shift = np.zeros(n, dtype=bool)

    for i in range(1, n):
        sq_i = seqs[i]
        len_i = len(sq_i)
        for j in range(i):
            if is_shift[j]:
                continue
            sq_j = seqs[j]
            len_j = len(sq_j)

            al1, al2 = _nwalign(sq_j, sq_i)
            nmatch, nmismatch, nindel = _eval_pair(al1, al2)

            if (nmatch < len_j and nmatch < len_i
                    and nmatch >= min_overlap
                    and nmismatch == 0 and nindel == 0):
                is_shift[i] = True
                if verbose:
                    logger.info(
                        "[INFO] is_shift_denovo: seq %d is a shift of seq %d "
                        "(overlap=%d)", i, j, nmatch
                    )
                break

    return is_shift

plot_errors

plot_errors(dq, nti=('A', 'C', 'G', 'T'), ntj=('A', 'C', 'G', 'T'), obs=True, err_out=True, err_in=False, nominal_q=False, output=None)

Error rate diagnostic plot using plotly.

Mirrors R's plotErrors.

Creates a faceted plot showing error rates for each nucleotide transition (from_nuc -> to_nuc). Self-transitions are blanked out.

Parameters:

Name Type Description Default
dq

A dada result dict (with err_out, optionally err_in and trans), or a numpy error matrix (16, ncol).

required
nti Tuple[str, ...]

Tuple of source nucleotides to include.

('A', 'C', 'G', 'T')
ntj Tuple[str, ...]

Tuple of target nucleotides to include.

('A', 'C', 'G', 'T')
obs bool

If True, show observed error rates as scatter points.

True
err_out bool

If True, show the estimated (output) error rates as a line.

True
err_in bool

If True, show the input error rates as a dashed line.

False
nominal_q bool

If True, show nominal Q-score error rates as a red line.

False
output Optional[str]

If given, save to this path (.html for interactive, .png/.svg/.pdf require kaleido). If None, returns the plotly Figure object.

None

Returns:

Type Description

plotly.graph_objects.Figure if output is None, else None.

Source code in papa2/utils.py
def plot_errors(
    dq,
    nti: Tuple[str, ...] = ("A", "C", "G", "T"),
    ntj: Tuple[str, ...] = ("A", "C", "G", "T"),
    obs: bool = True,
    err_out: bool = True,
    err_in: bool = False,
    nominal_q: bool = False,
    output: Optional[str] = None,
):
    """Error rate diagnostic plot using plotly.

    Mirrors R's ``plotErrors``.

    Creates a faceted plot showing error rates for each nucleotide
    transition (from_nuc -> to_nuc).  Self-transitions are blanked out.

    Args:
        dq: A dada result dict (with ``err_out``, optionally ``err_in``
            and ``trans``), or a numpy error matrix (16, ncol).
        nti: Tuple of source nucleotides to include.
        ntj: Tuple of target nucleotides to include.
        obs: If True, show observed error rates as scatter points.
        err_out: If True, show the estimated (output) error rates as a line.
        err_in: If True, show the input error rates as a dashed line.
        nominal_q: If True, show nominal Q-score error rates as a red line.
        output: If given, save to this path (.html for interactive,
            .png/.svg/.pdf require kaleido).  If None, returns the
            plotly Figure object.

    Returns:
        plotly.graph_objects.Figure if output is None, else None.
    """
    try:
        import plotly.graph_objects as go
        from plotly.subplots import make_subplots
    except ImportError:
        raise ImportError("plotly is required for plot_errors: pip install plotly")

    nt_map = {"A": 0, "C": 1, "G": 2, "T": 3}

    # Extract error info
    err_info = get_errors(dq, detailed=True, enforce=False)
    err_out_mat = err_info["err_out"]
    err_in_mat = err_info.get("err_in", None) if err_in else None
    trans_mat = err_info.get("trans", None) if obs else None

    ncol = err_out_mat.shape[1]
    qq = np.arange(ncol)

    nti_indices = [nt_map[nt] for nt in nti]
    ntj_indices = [nt_map[nt] for nt in ntj]

    n_rows = len(nti_indices)
    n_cols = len(ntj_indices)

    subplot_titles = []
    for ni in nti:
        for nj in ntj:
            subplot_titles.append(f"{ni} -> {nj}" if ni != nj else "")

    fig = make_subplots(
        rows=n_rows, cols=n_cols,
        subplot_titles=subplot_titles,
        shared_xaxes=True, shared_yaxes=True,
        horizontal_spacing=0.03, vertical_spacing=0.05,
    )

    for ri, ni_idx in enumerate(nti_indices):
        for ci, nj_idx in enumerate(ntj_indices):
            row_plot = ri + 1
            col_plot = ci + 1
            row_idx = ni_idx * 4 + nj_idx

            if ni_idx == nj_idx:
                # Self-transition: blank
                continue

            show_legend = (ri == 0 and ci == 1)

            # Observed error rates (scatter)
            if obs and trans_mat is not None:
                from .error import _BASE_ROWS
                base_rows = _BASE_ROWS[ni_idx]
                tot = trans_mat[base_rows].sum(axis=0).astype(np.float64)
                errs = trans_mat[row_idx].astype(np.float64)
                with np.errstate(divide="ignore", invalid="ignore"):
                    obs_rate = errs / tot
                obs_rate[~np.isfinite(obs_rate)] = np.nan

                # Point sizes proportional to sqrt(total counts)
                sizes = np.sqrt(np.maximum(tot, 0)) / np.sqrt(np.nanmax(tot) + 1) * 12 + 2

                fig.add_trace(
                    go.Scatter(
                        x=qq, y=obs_rate,
                        mode="markers",
                        marker=dict(
                            size=sizes,
                            color="rgba(0, 0, 0, 0.4)",
                        ),
                        name="Observed",
                        showlegend=show_legend,
                        legendgroup="obs",
                    ),
                    row=row_plot, col=col_plot,
                )

            # Estimated error rates (line)
            if err_out:
                fig.add_trace(
                    go.Scatter(
                        x=qq, y=err_out_mat[row_idx],
                        mode="lines",
                        line=dict(color="#1f77b4", width=2),
                        name="Estimated",
                        showlegend=show_legend,
                        legendgroup="est",
                    ),
                    row=row_plot, col=col_plot,
                )

            # Input error rates (dashed line)
            if err_in and err_in_mat is not None:
                fig.add_trace(
                    go.Scatter(
                        x=qq, y=err_in_mat[row_idx],
                        mode="lines",
                        line=dict(color="#ff7f0e", width=1.5, dash="dash"),
                        name="Input",
                        showlegend=show_legend,
                        legendgroup="in",
                    ),
                    row=row_plot, col=col_plot,
                )

            # Nominal Q-score error rates (red line)
            if nominal_q:
                nominal_rates = 10.0 ** (-qq / 10.0)
                fig.add_trace(
                    go.Scatter(
                        x=qq, y=nominal_rates,
                        mode="lines",
                        line=dict(color="red", width=1, dash="dot"),
                        name="Nominal Q",
                        showlegend=show_legend,
                        legendgroup="nom",
                    ),
                    row=row_plot, col=col_plot,
                )

            # Log scale for y-axis
            fig.update_yaxes(type="log", row=row_plot, col=col_plot)

    fig.update_layout(
        title="Error rates by nucleotide transition",
        height=200 * n_rows + 100,
        width=200 * n_cols + 100,
    )
    fig.update_xaxes(title_text="Quality Score")
    fig.update_yaxes(title_text="Error Rate")

    if output:
        if output.endswith(".html"):
            fig.write_html(output)
        else:
            fig.write_image(output)
        return None

    return fig

plot_complexity

plot_complexity(files, kmer_size=2, n=100000, bins=100, output=None)

Sequence complexity histogram using plotly.

Samples n reads from each FASTQ file, computes sequence complexity (Shannon effective number of kmers), and plots a faceted histogram.

Parameters:

Name Type Description Default
files Union[str, List[str]]

One or more FASTQ file paths (may be gzipped).

required
kmer_size int

Kmer size for complexity calculation (default 2).

2
n int

Maximum number of reads to sample per file.

100000
bins int

Number of histogram bins.

100
output Optional[str]

If given, save to this path (.html for interactive, .png/.svg/.pdf require kaleido). If None, returns the plotly Figure object.

None

Returns:

Type Description

plotly.graph_objects.Figure if output is None, else None.

Source code in papa2/utils.py
def plot_complexity(
    files: Union[str, List[str]],
    kmer_size: int = 2,
    n: int = 100000,
    bins: int = 100,
    output: Optional[str] = None,
):
    """Sequence complexity histogram using plotly.

    Samples *n* reads from each FASTQ file, computes sequence complexity
    (Shannon effective number of kmers), and plots a faceted histogram.

    Args:
        files: One or more FASTQ file paths (may be gzipped).
        kmer_size: Kmer size for complexity calculation (default 2).
        n: Maximum number of reads to sample per file.
        bins: Number of histogram bins.
        output: If given, save to this path (.html for interactive,
            .png/.svg/.pdf require kaleido).  If None, returns the
            plotly Figure object.

    Returns:
        plotly.graph_objects.Figure if output is None, else None.
    """
    try:
        import plotly.graph_objects as go
        from plotly.subplots import make_subplots
    except ImportError:
        raise ImportError("plotly is required for plot_complexity: pip install plotly")

    if isinstance(files, str):
        files = [files]

    n_files = len(files)
    fig = make_subplots(
        rows=n_files, cols=1,
        subplot_titles=[os.path.basename(f) for f in files],
        shared_xaxes=True,
        vertical_spacing=0.05 if n_files > 1 else 0.0,
    )

    for fi, fpath in enumerate(files):
        # Read sequences from FASTQ
        seqs_read: List[str] = []
        opener = gzip.open if fpath.endswith(".gz") else open
        count = 0
        with opener(fpath, "rt") as fh:
            while count < n:
                header = fh.readline()
                if not header:
                    break
                seq = fh.readline().rstrip("\n\r")
                plus = fh.readline()
                qual = fh.readline()
                if not seq:
                    break
                seqs_read.append(seq)
                count += 1

        # Compute complexity
        complexities = seq_complexity(seqs_read, kmer_size=kmer_size)

        fig.add_trace(
            go.Histogram(
                x=complexities,
                nbinsx=bins,
                name=os.path.basename(fpath),
                showlegend=True,
            ),
            row=fi + 1, col=1,
        )

    fig.update_layout(
        title="Sequence Complexity",
        height=300 * n_files + 100,
        width=700,
    )
    fig.update_xaxes(title_text="Complexity (Shannon effective kmers)")
    fig.update_yaxes(title_text="Count")

    if output:
        if output.endswith(".html"):
            fig.write_html(output)
        else:
            fig.write_image(output)
        return None

    return fig

plot_sankey

plot_sankey(track, title='Read tracking through papa2 pipeline', output=None, width=900, height=500)

Create a Sankey diagram showing read flow through pipeline stages.

Parameters:

Name Type Description Default
track dict

Dict mapping stage names to read/sequence counts. Typical keys (in order): {"input": 50000, "filtered": 45000, "denoised": 42000, "merged": 40000, "non-chimeric": 38000}

Can also be a list of such dicts (one per sample) — values will be summed across samples.

Or a pandas DataFrame with stage columns and sample rows (as produced by a read-tracking table).

required
title str

Plot title.

'Read tracking through papa2 pipeline'
output Optional[str]

If given, save to this path (.html for interactive, .png/.svg/.pdf require kaleido). If None, returns the plotly Figure object.

None
width int

Figure width in pixels.

900
height int

Figure height in pixels.

500

Returns:

Type Description

plotly.graph_objects.Figure if output is None, else None.

Source code in papa2/utils.py
def plot_sankey(
    track: dict,
    title: str = "Read tracking through papa2 pipeline",
    output: Optional[str] = None,
    width: int = 900,
    height: int = 500,
):
    """Create a Sankey diagram showing read flow through pipeline stages.

    Args:
        track: Dict mapping stage names to read/sequence counts.
            Typical keys (in order):
            ``{"input": 50000, "filtered": 45000, "denoised": 42000,
               "merged": 40000, "non-chimeric": 38000}``

            Can also be a list of such dicts (one per sample) — values
            will be summed across samples.

            Or a pandas DataFrame with stage columns and sample rows
            (as produced by a read-tracking table).

        title: Plot title.
        output: If given, save to this path (.html for interactive,
            .png/.svg/.pdf require kaleido). If None, returns the
            plotly Figure object.
        width: Figure width in pixels.
        height: Figure height in pixels.

    Returns:
        plotly.graph_objects.Figure if output is None, else None.
    """
    try:
        import plotly.graph_objects as go
    except ImportError:
        raise ImportError("plotly is required for plot_sankey: pip install plotly")

    # Normalize input
    if hasattr(track, "iterrows"):  # DataFrame
        # Sum columns to get totals per stage
        stages = list(track.columns)
        counts = [int(track[col].sum()) for col in stages]
    elif isinstance(track, list):
        # List of dicts — sum values per key
        all_keys = list(track[0].keys())
        counts = [sum(d.get(k, 0) for d in track) for k in all_keys]
        stages = all_keys
    elif isinstance(track, dict):
        stages = list(track.keys())
        counts = [int(track[k]) for k in stages]
    else:
        raise TypeError(f"track must be a dict, list of dicts, or DataFrame, got {type(track)}")

    if len(stages) < 2:
        raise ValueError("Need at least 2 stages for a Sankey diagram")

    # Build Sankey data
    # Nodes: each stage + a "lost" node for each transition
    node_labels = list(stages)
    node_colors = []

    # Color palette: blues for pipeline stages, greens for taxonomy ranks, red for lost
    stage_colors = [
        "#2196F3", "#1E88E5", "#1976D2", "#1565C0",
        "#0D47A1", "#0A3D91",
        # taxonomy ranks (greens)
        "#4CAF50", "#43A047", "#388E3C", "#2E7D32",
        "#1B5E20", "#145214", "#0E3E0E",
    ]
    lost_color = "#EF5350"

    for i in range(len(stages)):
        node_colors.append(stage_colors[i % len(stage_colors)])

    # Add "lost" nodes
    for i in range(len(stages) - 1):
        lost = counts[i] - counts[i + 1]
        if lost > 0:
            node_labels.append(f"lost ({stages[i]}{stages[i+1]})")
            node_colors.append(lost_color)

    source = []
    target = []
    value = []
    link_colors = []

    lost_idx = len(stages)  # first "lost" node index
    for i in range(len(stages) - 1):
        retained = counts[i + 1]
        lost = counts[i] - counts[i + 1]

        # Retained flow: stage[i] → stage[i+1]
        if retained > 0:
            source.append(i)
            target.append(i + 1)
            value.append(retained)
            link_colors.append("rgba(33, 150, 243, 0.4)")

        # Lost flow: stage[i] → lost node
        if lost > 0:
            source.append(i)
            target.append(lost_idx)
            value.append(lost)
            link_colors.append("rgba(239, 83, 80, 0.4)")
            lost_idx += 1

    # Format counts for labels
    def fmt(n):
        if n >= 1_000_000:
            return f"{n/1_000_000:.1f}M"
        if n >= 1_000:
            return f"{n/1_000:.1f}K"
        return str(n)

    node_labels_fmt = []
    for i, label in enumerate(node_labels):
        if i < len(counts):
            node_labels_fmt.append(f"{label}<br>{fmt(counts[i])}")
        else:
            # Lost node — find its value
            lost_val = 0
            for j, s in enumerate(source):
                if target[j] == i:
                    lost_val = value[j]
                    break
            node_labels_fmt.append(f"{label}<br>{fmt(lost_val)}")

    fig = go.Figure(data=[go.Sankey(
        node=dict(
            pad=20,
            thickness=30,
            line=dict(color="black", width=0.5),
            label=node_labels_fmt,
            color=node_colors,
        ),
        link=dict(
            source=source,
            target=target,
            value=value,
            color=link_colors,
        ),
    )])

    fig.update_layout(
        title_text=title,
        font_size=13,
        width=width,
        height=height,
    )

    if output:
        if output.endswith(".html"):
            fig.write_html(output)
        else:
            fig.write_image(output)
        return None

    return fig

track_reads

track_reads(dereps=None, dadas=None, mergers=None, seqtab=None, seqtab_nochim=None, taxa=None)

Build a read-tracking dict from pipeline stage outputs.

Pass whichever stages you have — earlier stages are required for later ones to make sense, but all are optional.

Parameters:

Name Type Description Default
dereps

List of derep dicts (from derep_fastq)

None
dadas

List of dada result dicts

None
mergers

List of merger lists (from merge_pairs)

None
seqtab

Sequence table (dict with 'table'/'seqs', DataFrame, or array)

None
seqtab_nochim

Chimera-filtered sequence table (same formats as seqtab)

None
taxa

Taxonomy array from assign_species / add_species, shape (N, K). Used together with seqtab_nochim (or seqtab) to count reads assigned to classified ASVs.

None

Returns:

Type Description

Dict mapping stage names to total read counts.

Pass directly to plot_sankey().

Source code in papa2/utils.py
def track_reads(
    dereps=None,
    dadas=None,
    mergers=None,
    seqtab=None,
    seqtab_nochim=None,
    taxa=None,
):
    """Build a read-tracking dict from pipeline stage outputs.

    Pass whichever stages you have — earlier stages are required for
    later ones to make sense, but all are optional.

    Args:
        dereps: List of derep dicts (from derep_fastq)
        dadas: List of dada result dicts
        mergers: List of merger lists (from merge_pairs)
        seqtab: Sequence table (dict with 'table'/'seqs', DataFrame, or array)
        seqtab_nochim: Chimera-filtered sequence table (same formats as seqtab)
        taxa: Taxonomy array from assign_species / add_species, shape (N, K).
            Used together with seqtab_nochim (or seqtab) to count reads
            assigned to classified ASVs.

    Returns:
        Dict mapping stage names to total read counts.
        Pass directly to plot_sankey().
    """
    track = {}
    if dereps is not None:
        track["input"] = int(sum(d["abundances"].sum() for d in dereps))
    if dadas is not None:
        if isinstance(dadas, dict):
            dadas = [dadas]
        track["denoised"] = int(sum(sum(d["denoised"].values()) for d in dadas))
    if mergers is not None:
        total = 0
        for m in mergers:
            if isinstance(m, list):
                total += sum(x["abundance"] for x in m if x.get("accept", True))
            elif isinstance(m, dict) and "abundance" in m:
                if m.get("accept", True):
                    total += m["abundance"]
        track["merged"] = int(total)
    if seqtab is not None:
        track["tabled"] = _seqtab_sum(seqtab)
    if seqtab_nochim is not None:
        track["non-chimeric"] = _seqtab_sum(seqtab_nochim)
    if taxa is not None:
        # Break down reads by taxonomic rank.
        # taxa can be:
        #   - pandas DataFrame with rank columns (Kingdom, Phylum, ..., Species)
        #   - numpy array of shape (N_asvs, K_ranks)
        #   - numpy array of shape (N_asvs,) for single-rank
        import numpy as np

        # Get per-ASV read totals from the best available table
        ref_st = seqtab_nochim if seqtab_nochim is not None else seqtab
        if ref_st is not None:
            if isinstance(ref_st, dict) and "table" in ref_st:
                tbl = ref_st["table"]
            elif hasattr(ref_st, "values"):
                tbl = ref_st.values
            else:
                tbl = np.asarray(ref_st)
            per_asv = tbl.sum(axis=0) if tbl.ndim == 2 else tbl

            # Determine rank names and values
            if hasattr(taxa, "columns"):
                # pandas DataFrame — use column names
                rank_names = list(taxa.columns)
                taxa_vals = taxa.values
            else:
                taxa_vals = np.asarray(taxa)
                if taxa_vals.ndim == 1:
                    taxa_vals = taxa_vals.reshape(-1, 1)
                rank_names = [
                    "Kingdom", "Phylum", "Class", "Order",
                    "Family", "Genus", "Species",
                ][:taxa_vals.shape[1]]

            if len(per_asv) == taxa_vals.shape[0]:
                for k, rank in enumerate(rank_names):
                    mask = np.array(
                        [taxa_vals[i, k] is not None
                         and str(taxa_vals[i, k]).strip() != ""
                         for i in range(taxa_vals.shape[0])]
                    )
                    track[rank.lower()] = int(per_asv[mask].sum())
    return track

remove_primers

remove_primers(fn, fout, primer_fwd, primer_rev=None, max_mismatch=2, trim_fwd=True, trim_rev=True, orient=True, compress=True, verbose=False)

Match and trim primer sequences from FASTQ reads.

Reads the input FASTQ file, matches the forward primer at the start of each read and the reverse-complement of the reverse primer at the end. Reads that match are trimmed (if trim_fwd / trim_rev) and written to the output file.

If orient=True, reads that don't match in forward orientation are checked in reverse-complement and flipped if they match.

Parameters:

Name Type Description Default
fn str

Input FASTQ file path (may be gzipped).

required
fout str

Output FASTQ file path.

required
primer_fwd str

Forward primer sequence (5' to 3').

required
primer_rev Optional[str]

Reverse primer sequence (5' to 3'), optional. Its reverse complement is matched at the 3' end of reads.

None
max_mismatch int

Maximum allowed mismatches per primer match.

2
trim_fwd bool

If True, trim the matched forward primer region.

True
trim_rev bool

If True, trim the matched reverse primer region.

True
orient bool

If True, check reverse complement of reads and flip if primers match in that orientation.

True
compress bool

If True and output path ends with .gz, gzip-compress the output.

True
verbose bool

If True, log progress and match statistics.

False

Returns:

Type Description
int

Tuple (reads_in, reads_out) with the number of reads read

int

and the number of reads written.

Source code in papa2/utils.py
def remove_primers(
    fn: str,
    fout: str,
    primer_fwd: str,
    primer_rev: Optional[str] = None,
    max_mismatch: int = 2,
    trim_fwd: bool = True,
    trim_rev: bool = True,
    orient: bool = True,
    compress: bool = True,
    verbose: bool = False,
) -> Tuple[int, int]:
    """Match and trim primer sequences from FASTQ reads.

    Reads the input FASTQ file, matches the forward primer at the start
    of each read and the reverse-complement of the reverse primer at
    the end.  Reads that match are trimmed (if ``trim_fwd`` / ``trim_rev``)
    and written to the output file.

    If ``orient=True``, reads that don't match in forward orientation
    are checked in reverse-complement and flipped if they match.

    Args:
        fn: Input FASTQ file path (may be gzipped).
        fout: Output FASTQ file path.
        primer_fwd: Forward primer sequence (5' to 3').
        primer_rev: Reverse primer sequence (5' to 3'), optional.
            Its reverse complement is matched at the 3' end of reads.
        max_mismatch: Maximum allowed mismatches per primer match.
        trim_fwd: If True, trim the matched forward primer region.
        trim_rev: If True, trim the matched reverse primer region.
        orient: If True, check reverse complement of reads and flip
            if primers match in that orientation.
        compress: If True and output path ends with ``.gz``, gzip-compress
            the output.
        verbose: If True, log progress and match statistics.

    Returns:
        Tuple ``(reads_in, reads_out)`` with the number of reads read
        and the number of reads written.
    """
    primer_fwd = primer_fwd.upper()
    len_fwd = len(primer_fwd)

    if primer_rev is not None:
        primer_rev = primer_rev.upper()
        primer_rev_rc = _rc(primer_rev)
        len_rev_rc = len(primer_rev_rc)
    else:
        primer_rev_rc = None
        len_rev_rc = 0

    reads_in = 0
    reads_out = 0

    opener_in = gzip.open if fn.endswith(".gz") else open
    if compress and fout.endswith(".gz"):
        opener_out = gzip.open
        mode_out = "wt"
    else:
        opener_out = open
        mode_out = "w"

    with opener_in(fn, "rt") as fh_in, opener_out(fout, mode_out) as fh_out:
        while True:
            header = fh_in.readline()
            if not header:
                break
            seq = fh_in.readline().rstrip("\n\r")
            plus = fh_in.readline()
            qual = fh_in.readline().rstrip("\n\r")
            if not seq:
                break

            reads_in += 1
            seq_upper = seq.upper()
            matched = False
            is_rc = False

            # Try forward orientation
            fwd_ok = False
            rev_ok = False

            if len(seq_upper) >= len_fwd:
                fwd_ok = _hamming_match(seq_upper[:len_fwd], primer_fwd, max_mismatch)

            if primer_rev_rc is not None and len(seq_upper) >= len_rev_rc:
                rev_ok = _hamming_match(
                    seq_upper[len(seq_upper) - len_rev_rc:],
                    primer_rev_rc, max_mismatch,
                )
            elif primer_rev_rc is None:
                rev_ok = True  # No reverse primer to check

            if fwd_ok and rev_ok:
                matched = True
            elif orient and not matched:
                # Try reverse complement
                seq_rc = _rc(seq_upper)
                qual_rc = qual[::-1]

                fwd_ok_rc = False
                rev_ok_rc = False

                if len(seq_rc) >= len_fwd:
                    fwd_ok_rc = _hamming_match(seq_rc[:len_fwd], primer_fwd, max_mismatch)

                if primer_rev_rc is not None and len(seq_rc) >= len_rev_rc:
                    rev_ok_rc = _hamming_match(
                        seq_rc[len(seq_rc) - len_rev_rc:],
                        primer_rev_rc, max_mismatch,
                    )
                elif primer_rev_rc is None:
                    rev_ok_rc = True

                if fwd_ok_rc and rev_ok_rc:
                    matched = True
                    is_rc = True
                    seq = seq_rc
                    qual = qual_rc

            if not matched:
                continue

            # Trim primers
            start = len_fwd if (trim_fwd and fwd_ok) or (trim_fwd and is_rc and fwd_ok_rc) else 0
            end = len(seq) - len_rev_rc if (trim_rev and primer_rev_rc is not None) else len(seq)
            if end <= start:
                continue

            seq_out = seq[start:end]
            qual_out = qual[start:end]

            fh_out.write(header)
            fh_out.write(seq_out + "\n")
            fh_out.write("+\n")
            fh_out.write(qual_out + "\n")
            reads_out += 1

    if verbose:
        logger.info(
            "[INFO] remove_primers: %d reads in, %d reads out (%.1f%% matched).",
            reads_in, reads_out,
            100.0 * reads_out / reads_in if reads_in > 0 else 0.0,
        )

    return reads_in, reads_out

papa2._cdada — C Bindings

Low-level ctypes bindings to the libpapa2.so shared library. Most users should not call these directly — they are used internally by the higher-level functions above.

run_dada

run_dada(seqs, abundances, err_mat, quals=None, priors=None, match=5, mismatch=-4, gap_pen=-8, use_kmers=True, kdist_cutoff=0.42, band_size=16, omega_a=1e-40, omega_p=0.0001, omega_c=1e-40, detect_singletons=False, max_clust=0, min_fold=1, min_hamming=1, min_abund=1, use_quals=True, vectorized_alignment=True, homo_gap_pen=-8, multithread=True, verbose=False, sse=2, gapless=True, greedy=True)

Call the C++ dada2 algorithm on dereplicated sequences.

Parameters:

Name Type Description Default
seqs

list of str, unique DNA sequences (ACGT only)

required
abundances

array-like of int, abundance per unique sequence

required
err_mat

numpy array (16, ncol), error rate matrix, row-major

required
quals

numpy array (nraw, maxlen) of avg quality scores, or None

None
priors

array-like of int (0/1), or None

None

Returns:

Type Description

dict with keys: cluster_seqs, cluster_abunds, trans, map, pval, etc.

Source code in papa2/_cdada.py
def run_dada(seqs, abundances, err_mat, quals=None, priors=None,
             match=5, mismatch=-4, gap_pen=-8,
             use_kmers=True, kdist_cutoff=0.42, band_size=16,
             omega_a=1e-40, omega_p=1e-4, omega_c=1e-40,
             detect_singletons=False, max_clust=0,
             min_fold=1, min_hamming=1, min_abund=1,
             use_quals=True, vectorized_alignment=True, homo_gap_pen=-8,
             multithread=True, verbose=False,
             sse=2, gapless=True, greedy=True):
    """Call the C++ dada2 algorithm on dereplicated sequences.

    Args:
        seqs: list of str, unique DNA sequences (ACGT only)
        abundances: array-like of int, abundance per unique sequence
        err_mat: numpy array (16, ncol), error rate matrix, row-major
        quals: numpy array (nraw, maxlen) of avg quality scores, or None
        priors: array-like of int (0/1), or None

    Returns:
        dict with keys: cluster_seqs, cluster_abunds, trans, map, pval, etc.
    """
    nraw = len(seqs)
    if nraw == 0:
        return {"cluster_seqs": [], "cluster_abunds": np.array([], dtype=np.int32),
                "trans": np.zeros((16, 0), dtype=np.int32), "map": np.array([], dtype=np.int32),
                "pval": np.array([], dtype=np.float64)}

    # Convert sequences to C strings
    seq_arr = (ct.c_char_p * nraw)()
    for i, s in enumerate(seqs):
        seq_arr[i] = s.encode("ascii") if isinstance(s, str) else s

    # Abundances
    abund_arr = np.asarray(abundances, dtype=np.int32)

    # Priors
    if priors is None:
        prior_arr = np.zeros(nraw, dtype=np.int32)
    else:
        prior_arr = np.asarray(priors, dtype=np.int32)

    # Error matrix (16 x ncol, row-major, C-contiguous)
    err = np.ascontiguousarray(err_mat, dtype=np.float64)
    ncol_err = err.shape[1]

    # Quality matrix (C++ rounds to uint8_t internally via round())
    if quals is not None:
        q = np.ascontiguousarray(quals, dtype=np.float64)
        maxlen = q.shape[1]
        q_ptr = q.ctypes.data_as(ct.POINTER(ct.c_double))
    else:
        maxlen = max(len(s) for s in seqs)
        q_ptr = ct.POINTER(ct.c_double)()

    res_ptr = _lib.dada2_run(
        seq_arr,
        abund_arr.ctypes.data_as(ct.POINTER(ct.c_int)),
        prior_arr.ctypes.data_as(ct.POINTER(ct.c_int)),
        ct.c_int(nraw),
        err.ctypes.data_as(ct.POINTER(ct.c_double)),
        ct.c_int(ncol_err),
        q_ptr,
        ct.c_int(maxlen),
        ct.c_int(match), ct.c_int(mismatch), ct.c_int(gap_pen),
        ct.c_int(int(use_kmers)), ct.c_double(kdist_cutoff), ct.c_int(band_size),
        ct.c_double(omega_a), ct.c_double(omega_p), ct.c_double(omega_c),
        ct.c_int(int(detect_singletons)), ct.c_int(max_clust),
        ct.c_double(min_fold), ct.c_int(min_hamming), ct.c_int(min_abund),
        ct.c_int(int(use_quals)), ct.c_int(int(vectorized_alignment)),
        ct.c_int(homo_gap_pen),
        ct.c_int(int(multithread)), ct.c_int(int(verbose)),
        ct.c_int(sse), ct.c_int(int(gapless)), ct.c_int(int(greedy)),
    )

    if not res_ptr:
        raise RuntimeError("dada2_run returned NULL")

    res = res_ptr.contents
    nc = res.nclust
    nr = res.nraw

    # Extract results using fast buffer copies
    result = {
        "cluster_seqs": [res.cluster_seqs[i].decode("ascii") for i in range(nc)],
        "cluster_abunds": np.ctypeslib.as_array(res.cluster_abunds, shape=(nc,)).copy(),
        "cluster_n0": np.ctypeslib.as_array(res.cluster_n0, shape=(nc,)).copy(),
        "cluster_n1": np.ctypeslib.as_array(res.cluster_n1, shape=(nc,)).copy(),
        "cluster_nunq": np.ctypeslib.as_array(res.cluster_nunq, shape=(nc,)).copy(),
        "trans": np.ctypeslib.as_array(res.trans, shape=(16 * res.ncol_trans,)).copy().reshape(16, res.ncol_trans) if res.ncol_trans > 0 else np.zeros((16, 0), dtype=np.int32),
        "map": np.ctypeslib.as_array(res.map, shape=(nr,)).copy(),
        "pval": np.ctypeslib.as_array(res.pval, shape=(nr,)).copy(),
    }

    _lib.dada2_result_free(res_ptr)
    return result

run_taxonomy

run_taxonomy(seqs, refs, ref_to_genus, genusmat, ngenus, nlevel, verbose=True)

Run dada2 taxonomy assignment via C library.

Parameters:

Name Type Description Default
seqs

list of query sequences (str)

required
refs

list of reference sequences (str)

required
ref_to_genus

numpy array (nref,) int32, 0-indexed genus ID per ref

required
genusmat

numpy array (ngenus, nlevel) int32, genus-to-level mapping

required
ngenus

int

required
nlevel

int

required
verbose

bool

True

Returns:

Type Description

dict with: rval: numpy array (nseq,) int32, 1-indexed best genus per query (0=NA) rboot: numpy array (nseq, nlevel) int32, bootstrap counts

Source code in papa2/_cdada.py
def run_taxonomy(seqs, refs, ref_to_genus, genusmat, ngenus, nlevel, verbose=True):
    """Run dada2 taxonomy assignment via C library.

    Args:
        seqs: list of query sequences (str)
        refs: list of reference sequences (str)
        ref_to_genus: numpy array (nref,) int32, 0-indexed genus ID per ref
        genusmat: numpy array (ngenus, nlevel) int32, genus-to-level mapping
        ngenus: int
        nlevel: int
        verbose: bool

    Returns:
        dict with:
            rval: numpy array (nseq,) int32, 1-indexed best genus per query (0=NA)
            rboot: numpy array (nseq, nlevel) int32, bootstrap counts
    """
    nseq = len(seqs)
    nref = len(refs)

    # Convert sequences to C strings
    seq_arr = (ct.c_char_p * nseq)()
    for i, s in enumerate(seqs):
        seq_arr[i] = s.encode("ascii") if isinstance(s, str) else s

    ref_arr = (ct.c_char_p * nref)()
    for i, s in enumerate(refs):
        ref_arr[i] = s.encode("ascii") if isinstance(s, str) else s

    # Reference-to-genus mapping
    rtg = np.ascontiguousarray(ref_to_genus, dtype=np.int32)

    # Genus matrix (row-major)
    gmat = np.ascontiguousarray(genusmat, dtype=np.int32)

    res_ptr = _lib.dada2_assign_taxonomy(
        seq_arr, nseq,
        ref_arr, nref,
        rtg.ctypes.data_as(ct.POINTER(ct.c_int)),
        gmat.ctypes.data_as(ct.POINTER(ct.c_int)),
        ngenus, nlevel,
        ct.c_int(int(verbose))
    )

    if not res_ptr:
        raise RuntimeError("dada2_assign_taxonomy returned NULL")

    res = res_ptr.contents
    result = {
        "rval": np.ctypeslib.as_array(res.rval, shape=(nseq,)).copy(),
        "rboot": np.ctypeslib.as_array(res.rboot, shape=(nseq * nlevel,)).copy().reshape(nseq, nlevel),
    }

    _lib.dada2_tax_result_free(res_ptr)
    return result

nwalign

nwalign(s1, s2, match=5, mismatch=-4, gap_p=-8, band=-1)

NW ends-free alignment of two ACGT strings.

Returns:

Type Description
(al1, al2)

tuple of aligned strings.

Source code in papa2/_cdada.py
def nwalign(s1, s2, match=5, mismatch=-4, gap_p=-8, band=-1):
    """NW ends-free alignment of two ACGT strings.

    Returns:
        (al1, al2): tuple of aligned strings.
    """
    al1_p = ct.c_void_p()
    al2_p = ct.c_void_p()
    s1_b = s1.encode("ascii") if isinstance(s1, str) else s1
    s2_b = s2.encode("ascii") if isinstance(s2, str) else s2

    ret = _lib.dada2_nwalign(s1_b, s2_b, match, mismatch, gap_p, band,
                              ct.byref(al1_p), ct.byref(al2_p))
    if ret != 0:
        raise RuntimeError("dada2_nwalign failed")

    al1 = ct.cast(al1_p, ct.c_char_p).value.decode("ascii")
    al2 = ct.cast(al2_p, ct.c_char_p).value.decode("ascii")
    _lib.dada2_free_string(al1_p)
    _lib.dada2_free_string(al2_p)
    return al1, al2

eval_pair

eval_pair(al1, al2)

Evaluate an alignment: count matches, mismatches, indels (skipping end gaps).

Returns:

Type Description
(nmatch, nmismatch, nindel)

tuple of ints.

Source code in papa2/_cdada.py
def eval_pair(al1, al2):
    """Evaluate an alignment: count matches, mismatches, indels (skipping end gaps).

    Returns:
        (nmatch, nmismatch, nindel): tuple of ints.
    """
    m = ct.c_int(0)
    mm = ct.c_int(0)
    ind = ct.c_int(0)
    al1_b = al1.encode("ascii") if isinstance(al1, str) else al1
    al2_b = al2.encode("ascii") if isinstance(al2, str) else al2

    _lib.dada2_eval_pair(al1_b, al2_b, ct.byref(m), ct.byref(mm), ct.byref(ind))
    return m.value, mm.value, ind.value

pair_consensus

pair_consensus(al1, al2, prefer=1, trim_overhang=True)

Build consensus from two aligned strings.

Parameters:

Name Type Description Default
prefer

1 = al1 wins mismatches, 2 = al2 wins.

1
trim_overhang

if True, trim overhanging ends.

True

Returns:

Type Description

consensus string.

Source code in papa2/_cdada.py
def pair_consensus(al1, al2, prefer=1, trim_overhang=True):
    """Build consensus from two aligned strings.

    Args:
        prefer: 1 = al1 wins mismatches, 2 = al2 wins.
        trim_overhang: if True, trim overhanging ends.

    Returns:
        consensus string.
    """
    al1_b = al1.encode("ascii") if isinstance(al1, str) else al1
    al2_b = al2.encode("ascii") if isinstance(al2, str) else al2

    result_p = _lib.dada2_pair_consensus(al1_b, al2_b, prefer, int(trim_overhang))
    if not result_p:
        raise RuntimeError("dada2_pair_consensus failed")
    result = ct.cast(result_p, ct.c_char_p).value.decode("ascii")
    _lib.dada2_free_string(result_p)
    return result

rc

rc(seq)

Reverse complement an ACGT string.

Source code in papa2/_cdada.py
def rc(seq):
    """Reverse complement an ACGT string."""
    seq_b = seq.encode("ascii") if isinstance(seq, str) else seq
    result_p = _lib.dada2_rc(seq_b)
    if not result_p:
        raise RuntimeError("dada2_rc failed")
    result = ct.cast(result_p, ct.c_char_p).value.decode("ascii")
    _lib.dada2_free_string(result_p)
    return result