Skip to content

API Reference

This page documents all public functions in microscape. Each function's docstrings are rendered directly from the source.


microscape.filter -- Sequence Table Filtering

Quality-control filtering of the ASV sequence table by length, prevalence, abundance, and sample depth.

filter_seqtab

filter_seqtab(df, min_length=50, min_samples=2, min_seqs=2, min_reads=1000)

Apply cascading quality filters to a long-format sequence table.

Parameters

df : pd.DataFrame Long-format DataFrame with columns sample, sequence, count. min_length : int Minimum ASV sequence length in base-pairs. ASVs shorter than this are removed because they cannot be reliably assigned taxonomy. min_samples : int Minimum number of samples an ASV must appear in to be retained (prevalence filter). ASVs below this threshold are considered orphans. min_seqs : int Minimum total read count for an ASV across all samples (abundance filter). ASVs below this are considered singletons/doubletons. min_reads : int Minimum total reads per sample (depth filter). Samples with fewer reads are removed as insufficiently sequenced.

Returns

dict "filtered" -- pd.DataFrame of retained observations. "orphans" -- pd.DataFrame of ASVs removed by the prevalence filter. "small_samples" -- pd.DataFrame of samples removed by the depth filter. "stats" -- pd.DataFrame summarising each filter step.

Source code in microscape/filter.py
def filter_seqtab(df, min_length=50, min_samples=2, min_seqs=2, min_reads=1000):
    """Apply cascading quality filters to a long-format sequence table.

    Parameters
    ----------
    df : pd.DataFrame
        Long-format DataFrame with columns ``sample``, ``sequence``, ``count``.
    min_length : int
        Minimum ASV sequence length in base-pairs. ASVs shorter than this are
        removed because they cannot be reliably assigned taxonomy.
    min_samples : int
        Minimum number of samples an ASV must appear in to be retained
        (prevalence filter). ASVs below this threshold are considered orphans.
    min_seqs : int
        Minimum total read count for an ASV across all samples (abundance
        filter). ASVs below this are considered singletons/doubletons.
    min_reads : int
        Minimum total reads per sample (depth filter). Samples with fewer
        reads are removed as insufficiently sequenced.

    Returns
    -------
    dict
        ``"filtered"``      -- pd.DataFrame of retained observations.
        ``"orphans"``       -- pd.DataFrame of ASVs removed by the prevalence
                               filter.
        ``"small_samples"`` -- pd.DataFrame of samples removed by the depth
                               filter.
        ``"stats"``         -- pd.DataFrame summarising each filter step.
    """
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Expected a pandas DataFrame with columns: sample, sequence, count")
    for col in ("sample", "sequence", "count"):
        if col not in df.columns:
            raise ValueError(f"Missing required column: {col}")

    dt = df.copy()
    total_input = int(dt["count"].sum())
    n_input_asvs = dt["sequence"].nunique()

    # ------------------------------------------------------------------
    # 1. Remove short sequences
    # ------------------------------------------------------------------
    seq_lengths = dt.groupby("sequence").agg(
        seq_len=("sequence", lambda x: len(x.iloc[0]))
    ).reset_index()
    short_seqs = set(seq_lengths.loc[seq_lengths["seq_len"] < min_length, "sequence"])
    n_short = len(short_seqs)

    if n_short > 0:
        dt = dt[~dt["sequence"].isin(short_seqs)].copy()

    # ------------------------------------------------------------------
    # 2. Remove low-prevalence ASVs (orphans)
    # ------------------------------------------------------------------
    seq_prevalence = dt.groupby("sequence")["sample"].nunique().reset_index()
    seq_prevalence.columns = ["sequence", "n_samples"]
    orphan_seqs = set(
        seq_prevalence.loc[seq_prevalence["n_samples"] < min_samples, "sequence"]
    )
    n_orphans = len(orphan_seqs)

    dt_orphans = dt[dt["sequence"].isin(orphan_seqs)].copy()
    dt = dt[~dt["sequence"].isin(orphan_seqs)].copy()

    # ------------------------------------------------------------------
    # 3. Remove low-abundance ASVs
    # ------------------------------------------------------------------
    seq_abundance = dt.groupby("sequence")["count"].sum().reset_index()
    seq_abundance.columns = ["sequence", "total"]
    rare_seqs = set(seq_abundance.loc[seq_abundance["total"] < min_seqs, "sequence"])
    n_rare = len(rare_seqs)

    dt = dt[~dt["sequence"].isin(rare_seqs)].copy()

    # ------------------------------------------------------------------
    # 4. Remove shallow samples
    # ------------------------------------------------------------------
    sample_depth = dt.groupby("sample")["count"].sum().reset_index()
    sample_depth.columns = ["sample", "total"]
    small_samples = set(sample_depth.loc[sample_depth["total"] < min_reads, "sample"])
    n_small = len(small_samples)

    dt_small = dt[dt["sample"].isin(small_samples)].copy()
    dt = dt[~dt["sample"].isin(small_samples)].copy()

    # ------------------------------------------------------------------
    # Summary statistics
    # ------------------------------------------------------------------
    n_final_samples = dt["sample"].nunique()
    n_final_asvs = dt["sequence"].nunique()
    n_final_reads = int(dt["count"].sum())
    pct_retained = round(n_final_reads / max(total_input, 1) * 100, 1)

    stats = pd.DataFrame([
        {"step": "length",     "asvs_removed": n_short,   "samples_removed": None,
         "remaining_samples": None, "remaining_asvs": None, "pct_reads_retained": None},
        {"step": "prevalence", "asvs_removed": n_orphans,  "samples_removed": None,
         "remaining_samples": None, "remaining_asvs": None, "pct_reads_retained": None},
        {"step": "abundance",  "asvs_removed": n_rare,     "samples_removed": None,
         "remaining_samples": None, "remaining_asvs": None, "pct_reads_retained": None},
        {"step": "depth",      "asvs_removed": None,       "samples_removed": n_small,
         "remaining_samples": None, "remaining_asvs": None, "pct_reads_retained": None},
        {"step": "final",      "asvs_removed": None,       "samples_removed": None,
         "remaining_samples": n_final_samples, "remaining_asvs": n_final_asvs,
         "pct_reads_retained": pct_retained},
    ])

    return {
        "filtered": dt,
        "orphans": dt_orphans,
        "small_samples": dt_small,
        "stats": stats,
    }

plot_filter_summary

plot_filter_summary(df, output=None)

Generate diagnostic rank-abundance plots for a filtered sequence table.

Parameters

df : pd.DataFrame Long-format DataFrame with columns sample, sequence, count. Typically the "filtered" value returned by :func:filter_seqtab. output : str or None Path to a PDF file to save the plot. If None, the matplotlib Figure is returned instead of being saved.

Returns

matplotlib.figure.Figure or None The figure object when output is None; otherwise None (the figure is saved and closed).

Source code in microscape/filter.py
def plot_filter_summary(df, output=None):
    """Generate diagnostic rank-abundance plots for a filtered sequence table.

    Parameters
    ----------
    df : pd.DataFrame
        Long-format DataFrame with columns ``sample``, ``sequence``, ``count``.
        Typically the ``"filtered"`` value returned by :func:`filter_seqtab`.
    output : str or None
        Path to a PDF file to save the plot. If *None*, the matplotlib Figure
        is returned instead of being saved.

    Returns
    -------
    matplotlib.figure.Figure or None
        The figure object when *output* is None; otherwise None (the figure
        is saved and closed).
    """
    fig, axes = plt.subplots(2, 2, figsize=(10, 8))

    if len(df) > 0:
        reads_per_asv = df.groupby("sequence")["count"].sum().sort_values()
        reads_per_sample = df.groupby("sample")["count"].sum().sort_values()
        samps_per_asv = df.groupby("sequence")["sample"].nunique().sort_values()
        asvs_per_sample = df.groupby("sample")["sequence"].nunique().sort_values()

        ax = axes[0, 0]
        ax.plot(
            range(len(reads_per_asv)),
            np.log10(reads_per_asv.values.astype(float)),
            "o", markersize=1, color="steelblue",
        )
        ax.set_title("Reads per ASV")
        ax.set_ylabel("log10(reads)")
        ax.set_xlabel("ASV rank")

        ax = axes[0, 1]
        ax.plot(
            range(len(reads_per_sample)),
            np.log10(reads_per_sample.values.astype(float)),
            "o", markersize=1, color="steelblue",
        )
        ax.set_title("Reads per Sample")
        ax.set_ylabel("log10(reads)")
        ax.set_xlabel("Sample rank")

        ax = axes[1, 0]
        ax.plot(
            range(len(samps_per_asv)),
            np.log10(samps_per_asv.values.astype(float)),
            "o", markersize=1, color="darkgreen",
        )
        ax.set_title("Samples per ASV")
        ax.set_ylabel("log10(samples)")
        ax.set_xlabel("ASV rank")

        ax = axes[1, 1]
        ax.plot(
            range(len(asvs_per_sample)),
            np.log10(asvs_per_sample.values.astype(float)),
            "o", markersize=1, color="darkgreen",
        )
        ax.set_title("ASVs per Sample")
        ax.set_ylabel("log10(ASVs)")
        ax.set_xlabel("Sample rank")
    else:
        axes[0, 0].text(
            0.5, 0.5, "No data remaining after filtering",
            ha="center", va="center", fontsize=14,
            transform=axes[0, 0].transAxes,
        )

    fig.tight_layout()

    if output is not None:
        with PdfPages(output) as pdf:
            pdf.savefig(fig)
        plt.close("all")
        return None

    return fig

microscape.metadata -- Metadata Loading

Load and validate MIMARKS-compliant sample metadata.

load_metadata

load_metadata(path, seqtab=None, id_col='sample_name')

Load sample metadata from a TSV or CSV file.

Parameters

path : str Path to the metadata file. Files with .tsv or .txt extensions are read as tab-separated; everything else as comma-separated. seqtab : pd.DataFrame or None Optional long-format sequence table (columns: sample, sequence, count). When provided, the returned DataFrame includes additional _match_status information so users can see which samples exist only in the pipeline output, only in the metadata, or in both. id_col : str Name of the column containing sample identifiers. If this column is not found, the first column of the file is used instead.

Returns

pd.DataFrame Metadata indexed by sample identifier. The DataFrame has an attribute match_stats (a dict) summarising the overlap between the metadata and the sequence table when seqtab is provided.

Source code in microscape/metadata.py
def load_metadata(path, seqtab=None, id_col="sample_name"):
    """Load sample metadata from a TSV or CSV file.

    Parameters
    ----------
    path : str
        Path to the metadata file. Files with ``.tsv`` or ``.txt``
        extensions are read as tab-separated; everything else as
        comma-separated.
    seqtab : pd.DataFrame or None
        Optional long-format sequence table (columns: sample, sequence,
        count). When provided, the returned DataFrame includes additional
        ``_match_status`` information so users can see which samples exist
        only in the pipeline output, only in the metadata, or in both.
    id_col : str
        Name of the column containing sample identifiers. If this column
        is not found, the first column of the file is used instead.

    Returns
    -------
    pd.DataFrame
        Metadata indexed by sample identifier. The DataFrame has an
        attribute ``match_stats`` (a dict) summarising the overlap between
        the metadata and the sequence table when *seqtab* is provided.
    """
    # ------------------------------------------------------------------
    # Read file
    # ------------------------------------------------------------------
    ext = os.path.splitext(path)[1].lower()
    if ext in (".tsv", ".txt"):
        meta = pd.read_csv(path, sep="\t")
    else:
        meta = pd.read_csv(path)

    # ------------------------------------------------------------------
    # Auto-detect MIMARKS fields
    # ------------------------------------------------------------------
    found_mimarks = [f for f in MIMARKS_FIELDS if f in meta.columns]

    # ------------------------------------------------------------------
    # Set index to the sample-ID column
    # ------------------------------------------------------------------
    if id_col in meta.columns:
        meta = meta.set_index(id_col)
    else:
        meta = meta.set_index(meta.columns[0])

    # ------------------------------------------------------------------
    # Match against pipeline samples when a seqtab is provided
    # ------------------------------------------------------------------
    match_stats = {
        "mimarks_fields": found_mimarks,
        "n_metadata": len(meta),
    }

    if seqtab is not None:
        if isinstance(seqtab, pd.DataFrame) and "sample" in seqtab.columns:
            pipeline_samples = set(seqtab["sample"].unique())
        else:
            pipeline_samples = set()

        matched = pipeline_samples & set(meta.index)
        in_pipe_only = pipeline_samples - set(meta.index)
        in_meta_only = set(meta.index) - pipeline_samples

        match_stats.update({
            "matched": len(matched),
            "pipeline_only": len(in_pipe_only),
            "metadata_only": len(in_meta_only),
            "total_pipeline": len(pipeline_samples),
        })

    # Attach match_stats as a DataFrame attribute so callers can inspect it
    meta.attrs["match_stats"] = match_stats

    return meta

microscape.renormalize -- Taxonomic Renormalization

Split ASVs into taxonomic groups and compute within-group proportional abundances.

renormalize

renormalize(df, taxa)

Group ASVs by taxonomy and compute within-group proportions.

Parameters

df : pd.DataFrame Long-format count table with columns sample, sequence, count. taxa : pd.DataFrame Taxonomy table indexed by sequence. Expected to contain columns for at least one of Kingdom/Domain, Order, and Family (case- insensitive matching is applied).

Returns

dict A dictionary with the following keys:

* One key per biological group (e.g. ``"prokaryote"``,
  ``"eukaryote"``, ``"chloroplast"``, ``"mitochondria"``,
  ``"unknown"``), each mapping to a long-format pd.DataFrame
  with an added ``proportion`` column representing within-group
  relative abundance.
* ``"group_assignments"`` -- a pd.DataFrame mapping each sequence
  to its assigned group.
* ``"stats"`` -- a pd.DataFrame with per-group summary statistics.
Source code in microscape/renormalize.py
def renormalize(df, taxa):
    """Group ASVs by taxonomy and compute within-group proportions.

    Parameters
    ----------
    df : pd.DataFrame
        Long-format count table with columns ``sample``, ``sequence``,
        ``count``.
    taxa : pd.DataFrame
        Taxonomy table indexed by sequence. Expected to contain columns
        for at least one of Kingdom/Domain, Order, and Family (case-
        insensitive matching is applied).

    Returns
    -------
    dict
        A dictionary with the following keys:

        * One key per biological group (e.g. ``"prokaryote"``,
          ``"eukaryote"``, ``"chloroplast"``, ``"mitochondria"``,
          ``"unknown"``), each mapping to a long-format pd.DataFrame
          with an added ``proportion`` column representing within-group
          relative abundance.
        * ``"group_assignments"`` -- a pd.DataFrame mapping each sequence
          to its assigned group.
        * ``"stats"`` -- a pd.DataFrame with per-group summary statistics.
    """
    if not isinstance(df, pd.DataFrame):
        raise TypeError("df must be a pandas DataFrame with columns: sample, sequence, count")
    for col in ("sample", "sequence", "count"):
        if col not in df.columns:
            raise ValueError(f"Missing required column: {col}")
    if not isinstance(taxa, pd.DataFrame):
        raise TypeError("taxa must be a pandas DataFrame indexed by sequence")

    # ------------------------------------------------------------------
    # Identify taxonomy columns (flexible naming)
    # ------------------------------------------------------------------
    col_domain = _find_col(taxa, ["Kingdom", "Domain", "Level1"])
    col_order = _find_col(taxa, ["Order", "Level4"])
    col_family = _find_col(taxa, ["Family", "Level5"])

    # ------------------------------------------------------------------
    # Build sequence -> group mapping
    # ------------------------------------------------------------------
    seq_groups = {}
    for seq in taxa.index:
        row = taxa.loc[seq]
        seq_groups[seq] = _assign_group(row, col_domain, col_order, col_family)

    # Sequences in count table but missing from taxonomy
    all_seqs = df["sequence"].unique()
    for seq in all_seqs:
        if seq not in seq_groups:
            seq_groups[seq] = "unknown"

    # ------------------------------------------------------------------
    # Add group column and compute within-group proportions
    # ------------------------------------------------------------------
    seqtab = df.copy()
    seqtab["group"] = seqtab["sequence"].map(seq_groups)

    group_sample_totals = seqtab.groupby(["group", "sample"])["count"].transform("sum")
    seqtab["proportion"] = seqtab["count"] / group_sample_totals
    seqtab["proportion"] = seqtab["proportion"].fillna(0.0)

    # ------------------------------------------------------------------
    # Split by group
    # ------------------------------------------------------------------
    by_group = {}
    for grp, sub in seqtab.groupby("group"):
        by_group[grp] = sub.reset_index(drop=True)

    # ------------------------------------------------------------------
    # Build summary statistics
    # ------------------------------------------------------------------
    stats_rows = []
    for grp in sorted(by_group.keys()):
        sub = by_group[grp]
        stats_rows.append({
            "group": grp,
            "n_asvs": sub["sequence"].nunique(),
            "n_samples": sub["sample"].nunique(),
            "total_reads": int(sub["count"].sum()),
            "mean_reads_per_sample": round(
                sub.groupby("sample")["count"].sum().mean(), 1
            ),
            "median_reads_per_sample": round(
                sub.groupby("sample")["count"].sum().median(), 1
            ),
        })
    stats_df = pd.DataFrame(stats_rows)

    # ------------------------------------------------------------------
    # Sequence-to-group mapping table
    # ------------------------------------------------------------------
    group_assignments = pd.DataFrame({
        "sequence": list(seq_groups.keys()),
        "group": list(seq_groups.values()),
    })

    # ------------------------------------------------------------------
    # Build result
    # ------------------------------------------------------------------
    result = dict(by_group)
    result["group_assignments"] = group_assignments
    result["stats"] = stats_df
    return result

microscape.phylogeny -- Phylogenetic Tree Construction

Multiple sequence alignment (MAFFT) and neighbor-joining tree construction.

build_phylogeny

build_phylogeny(sequences, output_newick=None, cpus=1, min_coverage=0.6)

Build a multiple sequence alignment and neighbor-joining tree.

Parameters

sequences : list of str DNA sequences (ASVs) to align and build into a tree. output_newick : str or None If provided, the Newick tree string is also written to this file path. cpus : int Number of CPU threads to pass to MAFFT. min_coverage : float Minimum fraction of sequences that must have a non-gap character at a given alignment column for it to be retained during trimming.

Returns

dict "tree_newick" -- str, the Newick-format tree. "distance_matrix" -- np.ndarray, pairwise distance matrix. "alignment" -- str, trimmed alignment in FASTA format. "seq_map" -- dict mapping ASV identifiers to original sequences. "asv_ids" -- list of ASV identifier strings.

Source code in microscape/phylogeny.py
def build_phylogeny(sequences, output_newick=None, cpus=1, min_coverage=0.60):
    """Build a multiple sequence alignment and neighbor-joining tree.

    Parameters
    ----------
    sequences : list of str
        DNA sequences (ASVs) to align and build into a tree.
    output_newick : str or None
        If provided, the Newick tree string is also written to this file path.
    cpus : int
        Number of CPU threads to pass to MAFFT.
    min_coverage : float
        Minimum fraction of sequences that must have a non-gap character at
        a given alignment column for it to be retained during trimming.

    Returns
    -------
    dict
        ``"tree_newick"``      -- str, the Newick-format tree.
        ``"distance_matrix"``  -- np.ndarray, pairwise distance matrix.
        ``"alignment"``        -- str, trimmed alignment in FASTA format.
        ``"seq_map"``          -- dict mapping ASV identifiers to original
                                  sequences.
        ``"asv_ids"``          -- list of ASV identifier strings.
    """
    if not sequences:
        raise ValueError("No sequences provided")

    unique_seqs = sorted(set(sequences))
    n_seqs = len(unique_seqs)

    # ------------------------------------------------------------------
    # Assign stable ASV IDs
    # ------------------------------------------------------------------
    seq_map = {}
    for i, seq in enumerate(unique_seqs):
        asv_id = f"ASV_{i + 1:05d}"
        seq_map[asv_id] = seq
    id_list = list(seq_map.keys())

    # ------------------------------------------------------------------
    # Write temp FASTA and run MAFFT
    # ------------------------------------------------------------------
    tmp_dir = tempfile.mkdtemp(prefix="phylo_")
    tmp_input = os.path.join(tmp_dir, "input.fa")
    tmp_aligned = os.path.join(tmp_dir, "aligned.fa")

    try:
        with open(tmp_input, "w") as fh:
            for asv_id in id_list:
                fh.write(f">{asv_id}\n{seq_map[asv_id]}\n")

        mafft_ok = _run_mafft(tmp_input, tmp_aligned, cpus=cpus)

        if not mafft_ok:
            # Fallback: pad sequences to equal length (no true alignment)
            max_len = max(len(s) for s in unique_seqs)
            with open(tmp_aligned, "w") as fh:
                for asv_id in id_list:
                    padded = seq_map[asv_id].ljust(max_len, "-")
                    fh.write(f">{asv_id}\n{padded}\n")

        # --------------------------------------------------------------
        # Parse aligned FASTA
        # --------------------------------------------------------------
        aligned = _parse_fasta(tmp_aligned)
        aligned_seqs = [aligned[aid].upper() for aid in id_list]
        aln_len = max(len(s) for s in aligned_seqs) if aligned_seqs else 0
        aligned_seqs = [s.ljust(aln_len, "-") for s in aligned_seqs]

    finally:
        shutil.rmtree(tmp_dir, ignore_errors=True)

    # ------------------------------------------------------------------
    # Trim alignment (positions where < min_coverage of seqs have data)
    # ------------------------------------------------------------------
    aln_matrix = np.array([list(s) for s in aligned_seqs])
    coverage = np.mean(aln_matrix != "-", axis=0)
    keep_cols = coverage >= min_coverage
    n_kept = keep_cols.sum()

    if n_kept == 0:
        raise RuntimeError(
            "All alignment positions were trimmed -- no informative data remaining"
        )

    aln_matrix = aln_matrix[:, keep_cols]
    trimmed_seqs = ["".join(row) for row in aln_matrix]
    trim_len = len(trimmed_seqs[0])

    # ------------------------------------------------------------------
    # Compute distance matrix
    # ------------------------------------------------------------------
    dist_matrix = _compute_distance_matrix(trimmed_seqs, n_seqs, trim_len)

    # ------------------------------------------------------------------
    # Build tree
    # ------------------------------------------------------------------
    newick_str = _build_newick(dist_matrix, id_list, n_seqs)

    # ------------------------------------------------------------------
    # Build alignment FASTA string
    # ------------------------------------------------------------------
    alignment_parts = []
    for aid, seq in zip(id_list, trimmed_seqs):
        alignment_parts.append(f">{aid}\n{seq}")
    alignment_str = "\n".join(alignment_parts) + "\n"

    # ------------------------------------------------------------------
    # Optionally write Newick to file
    # ------------------------------------------------------------------
    if output_newick is not None:
        with open(output_newick, "w") as fh:
            fh.write(newick_str + "\n")

    return {
        "tree_newick": newick_str,
        "distance_matrix": dist_matrix,
        "alignment": alignment_str,
        "seq_map": seq_map,
        "asv_ids": id_list,
    }

microscape.ordination -- Ordination

Bray-Curtis dissimilarity with t-SNE or PCA dimensionality reduction.

ordinate

ordinate(df, method='tsne', metric='braycurtis', perplexity=30)

Compute ordination coordinates for samples and ASVs.

Parameters

df : pd.DataFrame Long-format count table with columns sample, sequence, count. method : str Dimensionality reduction method. Currently supports "tsne" (PCA followed by t-SNE) and "pca" (PCA only). metric : str Distance metric passed to :func:scipy.spatial.distance.pdist. Common choices: "braycurtis", "euclidean", "jaccard". perplexity : float Perplexity parameter for t-SNE. Automatically adjusted downwards when the number of entities is small.

Returns

dict "sample_coords" -- pd.DataFrame with columns label, dim1, dim2 for samples. "asv_coords" -- pd.DataFrame with columns label, dim1, dim2 for ASVs. "distances" -- np.ndarray, sample-by-sample distance matrix.

Source code in microscape/ordination.py
def ordinate(df, method="tsne", metric="braycurtis", perplexity=30):
    """Compute ordination coordinates for samples and ASVs.

    Parameters
    ----------
    df : pd.DataFrame
        Long-format count table with columns ``sample``, ``sequence``,
        ``count``.
    method : str
        Dimensionality reduction method. Currently supports ``"tsne"``
        (PCA followed by t-SNE) and ``"pca"`` (PCA only).
    metric : str
        Distance metric passed to :func:`scipy.spatial.distance.pdist`.
        Common choices: ``"braycurtis"``, ``"euclidean"``, ``"jaccard"``.
    perplexity : float
        Perplexity parameter for t-SNE. Automatically adjusted downwards
        when the number of entities is small.

    Returns
    -------
    dict
        ``"sample_coords"``  -- pd.DataFrame with columns ``label``,
                                ``dim1``, ``dim2`` for samples.
        ``"asv_coords"``     -- pd.DataFrame with columns ``label``,
                                ``dim1``, ``dim2`` for ASVs.
        ``"distances"``      -- np.ndarray, sample-by-sample distance matrix.
    """
    if not isinstance(df, pd.DataFrame):
        raise TypeError("df must be a pandas DataFrame with columns: sample, sequence, count")

    # Handle flexible column names
    required_cols = {"sample", "sequence", "count"}
    if not required_cols.issubset(set(df.columns)):
        col_map = {}
        for col in df.columns:
            cl = col.lower()
            if cl in ("sample", "sample_id", "sampleid"):
                col_map[col] = "sample"
            elif cl in ("sequence", "seq", "asv"):
                col_map[col] = "sequence"
            elif cl in ("count", "abundance", "reads"):
                col_map[col] = "count"
        if set(col_map.values()) >= required_cols:
            df = df.rename(columns=col_map)
        else:
            raise ValueError(
                f"Expected columns {required_cols}, got {set(df.columns)}"
            )

    # ------------------------------------------------------------------
    # Pivot to wide matrix (samples x sequences) and normalise to proportions
    # ------------------------------------------------------------------
    wide = df.pivot_table(
        index="sample", columns="sequence", values="count", fill_value=0
    )
    row_sums = wide.sum(axis=1).replace(0, 1)
    prop_matrix = wide.div(row_sums, axis=0)

    sample_labels = list(prop_matrix.index)
    seq_labels = list(prop_matrix.columns)

    # ------------------------------------------------------------------
    # Sample ordination
    # ------------------------------------------------------------------
    sample_dist = _compute_distance_matrix(prop_matrix.values, metric=metric)
    sample_coords = _run_ordination(
        sample_dist, sample_labels, method=method, perplexity=perplexity
    )

    # ------------------------------------------------------------------
    # ASV ordination (transpose)
    # ------------------------------------------------------------------
    seq_dist = _compute_distance_matrix(prop_matrix.T.values, metric=metric)
    asv_coords = _run_ordination(
        seq_dist, seq_labels, method=method, perplexity=perplexity
    )

    return {
        "sample_coords": sample_coords,
        "asv_coords": asv_coords,
        "distances": sample_dist,
    }

microscape.network -- Co-occurrence Networks

SparCC-style compositional correlation networks.

sparcc_network

sparcc_network(df, min_prevalence=0.1, min_correlation=0.1)

Compute a CLR-based co-occurrence network (SparCC approximation).

Parameters

df : pd.DataFrame Long-format count table with columns sample, sequence, count. min_prevalence : float Minimum fraction of samples in which an ASV must be present to be included in the network (0 to 1). min_correlation : float Minimum absolute Pearson correlation (on CLR-transformed data) for an edge to be retained in the output.

Returns

pd.DataFrame Edge list with columns node1, node2, correlation, weight (absolute correlation), and color ("positive" or "negative"). Sorted by descending weight.

Source code in microscape/network.py
def sparcc_network(df, min_prevalence=0.1, min_correlation=0.1):
    """Compute a CLR-based co-occurrence network (SparCC approximation).

    Parameters
    ----------
    df : pd.DataFrame
        Long-format count table with columns ``sample``, ``sequence``,
        ``count``.
    min_prevalence : float
        Minimum fraction of samples in which an ASV must be present to be
        included in the network (0 to 1).
    min_correlation : float
        Minimum absolute Pearson correlation (on CLR-transformed data) for
        an edge to be retained in the output.

    Returns
    -------
    pd.DataFrame
        Edge list with columns ``node1``, ``node2``, ``correlation``,
        ``weight`` (absolute correlation), and ``color`` (``"positive"`` or
        ``"negative"``). Sorted by descending weight.
    """
    if not isinstance(df, pd.DataFrame):
        raise TypeError("df must be a pandas DataFrame with columns: sample, sequence, count")

    # Handle flexible column names
    required_cols = {"sample", "sequence", "count"}
    if not required_cols.issubset(set(df.columns)):
        col_map = {}
        for col in df.columns:
            cl = col.lower()
            if cl in ("sample", "sample_id", "sampleid"):
                col_map[col] = "sample"
            elif cl in ("sequence", "seq", "asv"):
                col_map[col] = "sequence"
            elif cl in ("count", "abundance", "reads"):
                col_map[col] = "count"
        if set(col_map.values()) >= required_cols:
            df = df.rename(columns=col_map)
        else:
            raise ValueError(
                f"Expected columns {required_cols}, got {set(df.columns)}"
            )

    # ------------------------------------------------------------------
    # Pivot to wide count matrix (samples x sequences)
    # ------------------------------------------------------------------
    wide = df.pivot_table(
        index="sample", columns="sequence", values="count", fill_value=0
    )
    n_samples = wide.shape[0]

    # ------------------------------------------------------------------
    # Filter ASVs by prevalence
    # ------------------------------------------------------------------
    prevalence = (wide > 0).sum(axis=0) / n_samples
    keep_mask = prevalence >= min_prevalence
    n_kept = int(keep_mask.sum())

    if n_kept < 2:
        return pd.DataFrame(
            columns=["node1", "node2", "correlation", "weight", "color"]
        )

    wide_filtered = wide.loc[:, keep_mask]
    asv_labels = list(wide_filtered.columns)

    # ------------------------------------------------------------------
    # CLR transform and Pearson correlation
    # ------------------------------------------------------------------
    clr_matrix = _clr_transform(wide_filtered.values)
    cor_matrix = np.corrcoef(clr_matrix.T)

    # ------------------------------------------------------------------
    # Melt to edge list (upper triangle, no self-loops)
    # ------------------------------------------------------------------
    n = len(asv_labels)
    edges = []
    for i in range(n):
        for j in range(i + 1, n):
            corr_val = cor_matrix[i, j]
            if np.isnan(corr_val):
                continue
            if abs(corr_val) >= min_correlation:
                edges.append({
                    "node1": asv_labels[i],
                    "node2": asv_labels[j],
                    "correlation": corr_val,
                })

    edge_df = pd.DataFrame(edges)

    if len(edge_df) > 0:
        edge_df["weight"] = edge_df["correlation"].abs()
        edge_df["color"] = edge_df["correlation"].apply(
            lambda x: "positive" if x > 0 else "negative"
        )
        edge_df = edge_df.sort_values("weight", ascending=False).reset_index(drop=True)
    else:
        edge_df = pd.DataFrame(
            columns=["node1", "node2", "correlation", "weight", "color"]
        )

    return edge_df

microscape.viz -- Visualization Export

Export all analysis results as a JSON bundle for web-based visualization.

export_viz

export_viz(results_dir, output_dir)

Convert pipeline outputs to JSON files for the Svelte frontend.

Parameters

results_dir : str Directory containing pipeline pickle outputs. Expected files:

* ``seqtab_final.pkl`` -- filtered long-format count table
* ``renorm_merged.pkl`` or ``renorm_by_group.pkl`` -- renormalized data
* ``sample_bray_tsne.pkl`` -- sample t-SNE coordinates
* ``seq_bray_tsne.pkl`` -- ASV t-SNE coordinates
* ``sparcc_correlations.pkl`` -- network edge list
* ``metadata.pkl`` (optional) -- sample metadata
* ``*_taxonomy.pkl`` (optional) -- taxonomy assignments
str

Directory where JSON files will be written. Created if it does not exist.

Notes

Produces the following files in output_dir:

  • samples.json -- sample metadata with t-SNE coordinates
  • asvs.json.gz -- ASV info with t-SNE coordinates and taxonomy
  • counts.json.gz -- sparse count matrix
  • network.json -- correlation edge list
  • taxonomy.json -- per-database taxonomy assignments
  • renorm_stats.json -- group-level summary statistics
Source code in microscape/viz.py
def export_viz(results_dir, output_dir):
    """Convert pipeline outputs to JSON files for the Svelte frontend.

    Parameters
    ----------
    results_dir : str
        Directory containing pipeline pickle outputs. Expected files:

        * ``seqtab_final.pkl`` -- filtered long-format count table
        * ``renorm_merged.pkl`` or ``renorm_by_group.pkl`` -- renormalized data
        * ``sample_bray_tsne.pkl`` -- sample t-SNE coordinates
        * ``seq_bray_tsne.pkl`` -- ASV t-SNE coordinates
        * ``sparcc_correlations.pkl`` -- network edge list
        * ``metadata.pkl`` (optional) -- sample metadata
        * ``*_taxonomy.pkl`` (optional) -- taxonomy assignments

    output_dir : str
        Directory where JSON files will be written. Created if it does not
        exist.

    Notes
    -----
    Produces the following files in *output_dir*:

    * ``samples.json`` -- sample metadata with t-SNE coordinates
    * ``asvs.json.gz`` -- ASV info with t-SNE coordinates and taxonomy
    * ``counts.json.gz`` -- sparse count matrix
    * ``network.json`` -- correlation edge list
    * ``taxonomy.json`` -- per-database taxonomy assignments
    * ``renorm_stats.json`` -- group-level summary statistics
    """
    os.makedirs(output_dir, exist_ok=True)

    # ------------------------------------------------------------------
    # Load inputs
    # ------------------------------------------------------------------
    seqtab_path = os.path.join(results_dir, "seqtab_final.pkl")
    seqtab = _load_pickle(seqtab_path)
    if not isinstance(seqtab, pd.DataFrame):
        raise TypeError("seqtab_final.pkl must contain a pandas DataFrame")
    for col in ("sample", "sequence", "count"):
        if col not in seqtab.columns:
            raise ValueError(f"seqtab missing required column: {col}")

    # Renormalized data
    renorm_data = None
    renorm_table_list = None
    for name in ("renorm_merged.pkl", "renorm_by_group.pkl"):
        rpath = os.path.join(results_dir, name)
        if os.path.isfile(rpath):
            renorm_data = _load_pickle(rpath)
            break

    if isinstance(renorm_data, pd.DataFrame) and "group" in renorm_data.columns:
        renorm_table_list = renorm_data[["sequence", "group"]].drop_duplicates()
    elif isinstance(renorm_data, dict):
        rows = []
        for grp, sub_df in renorm_data.items():
            if isinstance(sub_df, pd.DataFrame) and "sequence" in sub_df.columns:
                for seq in sub_df["sequence"].unique():
                    rows.append({"sequence": seq, "group": grp})
        if rows:
            renorm_table_list = pd.DataFrame(rows).drop_duplicates()

    # Taxonomy
    taxonomy_dict = _load_taxonomy_dir(results_dir)

    # Metadata (optional)
    metadata = None
    meta_path = os.path.join(results_dir, "metadata.pkl")
    if os.path.isfile(meta_path):
        try:
            metadata = _load_pickle(meta_path)
        except Exception:
            metadata = None

    # t-SNE coordinates
    sample_tsne = _load_pickle(os.path.join(results_dir, "sample_bray_tsne.pkl"))
    seq_tsne = _load_pickle(os.path.join(results_dir, "seq_bray_tsne.pkl"))

    # Network (optional)
    network_df = None
    net_path = os.path.join(results_dir, "sparcc_correlations.pkl")
    if os.path.isfile(net_path):
        network_df = _load_pickle(net_path)
        if not isinstance(network_df, pd.DataFrame):
            network_df = None

    # ------------------------------------------------------------------
    # Build and write outputs
    # ------------------------------------------------------------------

    # samples.json
    samples = _build_samples(seqtab, sample_tsne, metadata)
    _write_json(samples, os.path.join(output_dir, "samples.json"))

    # asvs.json.gz
    asvs, seq_to_id = _build_asvs(seqtab, seq_tsne, renorm_table_list, taxonomy_dict)
    _write_json(asvs, os.path.join(output_dir, "asvs.json.gz"), compress=True)

    # counts.json.gz
    counts = _build_counts(seqtab, seq_to_id)
    _write_json(counts, os.path.join(output_dir, "counts.json.gz"), compress=True)

    # network.json
    network = _build_network(network_df, seq_to_id)
    _write_json(network, os.path.join(output_dir, "network.json"))

    # taxonomy.json
    taxonomy = _build_taxonomy(taxonomy_dict, seq_to_id)
    _write_json(taxonomy, os.path.join(output_dir, "taxonomy.json"))

    # renorm_stats.json
    renorm_stats = _build_renorm_stats(renorm_data)
    _write_json(renorm_stats, os.path.join(output_dir, "renorm_stats.json"))