Developing Scientific Software
In this article we will follow the tenets of TDD for developing Scientific Software as laid out in the first installment of this series to develop an edge detection filter known as the Sobel filter.
In the first article, we talked about how important – and tricky – it can be to develop reliable testing methods for software which the complex problems often found in scientific software. We also saw how to overcome those issues by following a development cycle inspired by TDD, but adapted for scientific computing. I reproduce a shortened version of these instructions below.
Implementation cycle
- Gather requirements
- Sketch the design
- Implement initial tests
- Implement your alpha version
- Build an oracle library
- Revisit tests
- Implement profiling
Optimization cycle
- Optimize
- Reimplement
New method cycle
- Implement new method
- Validate against previous curated oracles
Getting Started: The Sobel Filter
In this article, we will follow the above instructions to develop a function which applies the Sobel filter. The Sobel filter is a commonly used computer vision tool to detect or enhance edges in images. Keep reading to see some examples!
Starting with the first step, we gather some requirements. We will follow the standard formulation of the Sobel filter described in this article. Simply put, the Sobel operator consists of convolving image A with the following two 3 × 3 kernels, squaring each pixel in the resulting images, summing them and taking the point-by-point square root. If Ax and Ay are the images resulting from the convolutions, the result of the Sobel filter S is √(Ax² + Ay²).
Requirements
We want this function to take any 2D array and generate another 2D array. We might want it to operate on any two axes of an ndarray. We might even want it to work on more (or less) than two axes. We might have specifications for how to handle the edges of the array.
For now let's remember to keep it simple, and start with a 2D implementation. But before we do that, let's sketch the design.
Sketch the Design
We start with a simple design, leveraging Python's annotations. I highly recommend annotating as much as possible, and using mypy as a linter.
from typing import Tuple
from numpy.core.multiarray import normalize_axis_index
from numpy.typing import NDArray
def sobel(arr: NDArray, axes: Tuple[int, int] = (-2, -1)) -> NDArray:
    """Compute the Sobel filter of an image
    Parameters
    ----------
    arr : NDArray
        Input image
    axes : Tuple[int, int], optional
        Axes over which to compute the filter, by default (-2, -1)
    Returns
    -------
    NDArray
        Output
    """
    # Only accepts 2D arrays
    if arr.ndim != 2:
        raise NotImplementedError
    # Ensure that the axis[0] is the first axis, and axis[1] is the second
    # axis. The obscure `normalize_axis_index` converts negative indices to
    # indices between 0 and arr.ndim - 1.
    if any(
        normalize_axis_index(ax, arr.ndim) != i
        for i, ax in zip(range(2), axes)
    ):
        raise NotImplementedError
    passImplement Tests
This function doesn't do much. But it is documented, annotated, and its current limitations are baked into it. Now that we have a design, we immediately pivot to tests.
First, we notice that empty images (all zeroes) have no edges. So they must output zeroes as well. In fact, any constant image should also return zeros. Let's bake that into out first tests. We will also see how we can use monkey testing to test many arrays.
# test_zero_constant.py
import numpy as np
import pytest
# Test multiple dtypes at once
@pytest.mark.parametrize(
    "dtype",
    ["float16", "float32", "float64", "float128"],
)
def test_zero(dtype):
    # Set random seed
    seed = int(np.random.rand() * (2**32 - 1))
    np.random.seed(seed)
    # Create a 2D array of random shape and fill with zeros
    nx, ny = np.random.randint(3, 100, size=(2,))
    arr = np.zeros((nx, ny), dtype=dtype)
    # Apply sobel function
    arr_sob = sobel(arr)
    # `assert_array_equal` should fail most of the times.
    # It will only work when `arr_sob` is identically zero,
    # which is usually not the case.
    # DO NOT USE!
    # np.testing.assert_array_equal(
    #     arr_sob, 0.0, err_msg=f"{seed=} {nx=}, {ny=}"
    # )
    # `assert_almost_equal` can fail when used with high decimals.
    # It also relies on float64 checking, which might fail for
    # float128 types.
    # DO NOT USE!
    # np.testing.assert_almost_equal(
    #     arr_sob,
    #     np.zeros_like(arr),
    #     err_msg=f"{seed=} {nx=}, {ny=}",
    #     decimal=4,
    # )
    # `assert_allclose` with custom tolerance is my preferred method
    # The 10 is arbitrary and depends on the problem. If a method
    # which you know to be correct does not pass, increase to 100, etc.
    # If the tolerance needed to make the tests pass is too high, make
    # sure the method is actually correct.
    tol = 10 * np.finfo(arr.dtype).eps
    err_msg = f"{seed=} {nx=}, {ny=} {tol=}"  # Log seeds and other info
    np.testing.assert_allclose(
        arr_sob,
        np.zeros_like(arr),
        err_msg=err_msg,
        atol=tol,  # rtol is useless for desired=zeros
    )
@pytest.mark.parametrize(
    "dtype", ["float16", "float32", "float64", "float128"]
)
def test_constant(dtype):
    seed = int(np.random.rand() * (2**32 - 1))
    np.random.seed(seed)
    nx, ny = np.random.randint(3, 100, size=(2,))
    constant = np.random.randn(1).item()
    arr = np.full((nx, ny), fill_value=constant, dtype=dtype)
    arr_sob = sobel(arr)
    tol = 10 * np.finfo(arr.dtype).eps
    err_msg = f"{seed=} {nx=}, {ny=} {tol=}"
    np.testing.assert_allclose(
        arr_sob,
        np.zeros_like(arr),
        err_msg=err_msg,
        atol=tol,  # rtol is useless for desired=zeros
    )This code snippet can be run from the command-line with
$ pytest -qq -s -x -vv --durations=0 test_zero_constant.pyAlpha Version
Of course our tests will currently fail, but they are enough for now. Let's implement a first version.
from typing import Tuple
import numpy as np
from numpy.core.multiarray import normalize_axis_index
from numpy.typing import NDArray
def sobel(arr: NDArray, axes: Tuple[int, int] = (-2, -1)) -> NDArray:
    if arr.ndim != 2:
        raise NotImplementedError
    if any(
        normalize_axis_index(ax, arr.ndim) != i
        for i, ax in zip(range(2), axes)
    ):
        raise NotImplementedError
    # Define our filter kernels. Notice they inherit the input type, so
    # that a float32 input never has to be cast to float64 for computation.
    # But can you see where using another dtype for Gx and Gy might make
    # sense for some input dtypes?
    Gx = np.array(
        [[-1, 0, 1], [-2, 0, 2], [-1, 0, 1]],
        dtype=arr.dtype,
    )
    Gy = np.array(
        [[-1, -2, -1], [0, 0, 0], [1, 2, 1]],
        dtype=arr.dtype,
    )
    # Create the output array and fill with zeroes
    s = np.zeros_like(arr)
    for ix in range(1, arr.shape[0] - 1):
        for iy in range(1, arr.shape[1] - 1):
            # Pointwise multiplication followed by sum, aka convolution
            s1 = (Gx * arr[ix - 1 : ix + 2, iy - 1 : iy + 2]).sum()
            s2 = (Gy * arr[ix - 1 : ix + 2, iy - 1 : iy + 2]).sum()
            s[ix, iy] = np.hypot(s1, s2)  # np.sqrt(s1**2 + s2**2)
    return sWith this new function, all our tests should pass, and we should get an output like this:
$ pytest -qq -s -x -vv --durations=0 test_zero_constant.py
........
======================================== slowest durations =========================================
0.09s call     t_049988eae7f94139a7067f142bf2852f.py::test_constant[float16]
0.08s call     t_049988eae7f94139a7067f142bf2852f.py::test_zero[float64]
0.06s call     t_049988eae7f94139a7067f142bf2852f.py::test_constant[float128]
0.04s call     t_049988eae7f94139a7067f142bf2852f.py::test_zero[float128]
0.04s call     t_049988eae7f94139a7067f142bf2852f.py::test_constant[float64]
0.02s call     t_049988eae7f94139a7067f142bf2852f.py::test_constant[float32]
0.01s call     t_049988eae7f94139a7067f142bf2852f.py::test_zero[float16]
(17 durations < 0.005s hidden.  Use -vv to show these durations.)
8 passed in 0.35sWe have so far:
- Gathered requirements of our problem.
- Sketched an initial design.
- Implemented some tests.
- Implemented the alpha version, which passes these tests.
These tests provide verification for future versions, as well as a very rudimentary oracle library. But while unit tests are great at capturing minute deviations from expected results, they are not great at differentiating wrong results from quantitatively different – but still correct – results. Suppose tomorrow we want to implement another Sobel-type operator, which has a longer kernel. We don't expect the results to match exactly to our current version, but we do expect the outputs of both functions to be qualitatively very similar.
In addition, it is excellent practice to try many different inputs to our functions to get a sense of how they behave for different inputs. This ensures that we scientifically validate the results.
Scikit-image has an excellent library of images, which we can use to create oracles.
# !pip installscikit-image pooch
from typing import Dict, Callable
import numpy as np
import skimage.data
bwimages: Dict[str, np.ndarray] = {}
for attrname in skimage.data.__all__:
    attr = getattr(skimage.data, attrname)
    # Data are obtained via function calls
    if isinstance(attr, Callable):
        try:
            # Download the data
            data = attr()
            # Ensure it is a 2D array
            if isinstance(data, np.ndarray) and data.ndim == 2:
                # Convert from various int types to float32 to better
                # assess precision
                bwimages[attrname] = data.astype(np.float32)
        except:
            continue
# Apply sobel to images
bwimages_sobel = {k: sobel(v) for k, v in bwimages.items()}Once we have the data, we can plot it.
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1 import make_axes_locatable
def create_colorbar(im, ax):
    divider = make_axes_locatable(ax)
    cax = divider.append_axes("right", size="5%", pad=0.1)
    cb = ax.get_figure().colorbar(im, cax=cax, orientation="vertical")
    return cax, cb
for name, data in bwimages.items():
    fig, axs = plt.subplots(
        1, 2, figsize=(10, 4), sharex=True, sharey=True
    )
    im = axs[0].imshow(data, aspect="equal", cmap="gray")
    create_colorbar(im, axs[0])
    axs[0].set(title=name)
    im = axs[1].imshow(bwimages_sobel[name], aspect="equal", cmap="gray")
    create_colorbar(im, axs[1])
    axs[1].set(title=f"{name} sobel")These look really good! I would recommend storing these, both as arrays and as figures which I can quickly check against for a new version. I highly recommend HD5F for array storage. You can also set up a Sphinx Gallery to directly generate the figures them when updating documentation, that way you have a reproducible figure build that you can use to check against previous versions.
After the results have been validated, you can store them on disk and use them as part of your unit testing. Something like this:
oracle_library = [(k, v, bwimages_sobel[k]) for k, v in bwimages.items()]
# save_to_disk(oracle_library, ...)# test_oracle.py
import numpy as np
import pytest
from numpy.typing import NDArray
# oracle_library = read_from_disk(...)
@pytest.mark.parametrize("name,input,output", oracle_library)
def test_oracles(name: str, input: NDArray, output: NDArray):
    output_new = sobel(input)
    tol = 10 * np.finfo(input.dtype).eps
    mean_avg_error = np.abs(output_new - output).mean()
    np.testing.assert_allclose(
        output_new,
        output,
        err_msg=f"{name=} {tol=} {mean_avg_error=}",
        atol=tol,
        rtol=tol,
    )Profiling
Computing the Sobel filter for these datasets took a while! So the next step is to profile the code. I recommend creating a benchmark_xyz.py file for each test, and re-run them regularly to probe for performance regressions. This can even be part of your unit testing, but we won't go so far in this example. Another idea is to use the oracles above for benchmarking.
There are many ways of timing code execution. To get the system-wide, "real" elapsed time, use perf_counter_ns from the built-in time module to measure time in nanoseconds. In a Jupyter notebook, the built-in %%timeit cell magic times a certain cell execution. The decorator below is inspired by this cell magic and can be used to time any function.
import time
from functools import wraps
from typing import Callable, Optional
def sizeof_fmt(num, suffix="s"):
    for unit in ["n", "μ", "m"]:
        if abs(num) < 1000:
            return f"{num:3.1f} {unit}{suffix}"
        num /= 1000
    return f"{num:.1f}{suffix}"
def timeit(
    func_or_number: Optional[Callable] = None,
    number: int = 10,
) -> Callable:
    """Apply to a function to time its executions.
    Parameters
    ----------
    func_or_number : Optional[Callable], optional
        Function to be decorated or `number` argument (see below), by
        default None
    number : int, optional
        Number of times the function will run to obtain statistics, by
        default 10
    Returns
    -------
    Callable
        When fed a function, returns the decorated function. Otherwise
        returns a decorator.
    Examples
    --------
    .. code-block:: python
        @timeit
        def my_fun():
            pass
        @timeit(100)
        def my_fun():
            pass
        @timeit(number=3)
        def my_fun():
            pass
    """
    if isinstance(func_or_number, Callable):
        func = func_or_number
        number = number
    elif isinstance(func_or_number, int):
        func = None
        number = func_or_number
    else:
        func = None
        number = number
    def decorator(f):
        @wraps(f)
        def wrapper(*args, **kwargs):
            runs_ns = np.empty((number,))
            # Run first and measure store the result
            start_time = time.perf_counter_ns()
            result = f(*args, **kwargs)
            runs_ns[0] = time.perf_counter_ns() - start_time
            for i in range(1, number):
                start_time = time.perf_counter_ns()
                f(*args, **kwargs)  # Without storage, faster
                runs_ns[i] = time.perf_counter_ns() - start_time
            time_msg = f"{sizeof_fmt(runs_ns.mean())} ± "
            time_msg += f"{sizeof_fmt(runs_ns.std())}"
            print(
                f"{time_msg} per run (mean ± std. dev. of {number} runs)"
            )
            return result
        return wrapper
    if func is not None:
        return decorator(func)
    return decoratorPutting our function to the test:
arr_test = np.random.randn(500, 500)
sobel_timed = timeit(sobel)
sobel_timed(arr_test);
# 3.9s ± 848.9 ms per run (mean ± std. dev. of 10 runs)Not too fast

