Getting Started with Verifications Using cocotb/cocotbext-ndk
In this section, you will learn how to create a basic test for a flow/storage hardware component (such as a pipe, FIFO, etc.).
To get started, first create a cocotb
folder in the directory where the tested component is located,
and put all the scripts implemented in this tutorial into it.
Creating a Test
As an example, a simple test of an MVB FIFOX component will be used.
It can be found at ndk-fpga/comp/mvb_tools/storage/fifox/cocotb/cocotb_test.py
.
1# SPDX-License-Identifier: BSD-3-Clause
2# Copyright (C) 2025 CESNET z. s. p. o.
3# Author(s): Ondřej Schwarz <ondrejschwarz@cesnet.cz>
4# Daniel Kondys <kondys@cesnet.cz>
5
6
7# importing required modules
8import itertools
9
10import cocotb
11from cocotb.clock import Clock
12from cocotb.triggers import RisingEdge, ClockCycles
13from cocotbext.ofm.mvb.drivers import MVBDriver
14from cocotbext.ofm.mvb.monitors import MVBMonitor
15from cocotbext.ofm.ver.generators import random_integers
16from cocotb_bus.drivers import BitDriver
17from cocotb_bus.scoreboard import Scoreboard
18from cocotbext.ofm.utils.throughput_probe import ThroughputProbe, ThroughputProbeMvbInterface
19from cocotbext.ofm.base.generators import ItemRateLimiter
20from cocotbext.ofm.mvb.transaction import MvbTrClassic
21
22# definition of the class encapsulating components of the test
23class testbench():
24 # dut = device tree of the tested component
25 def __init__(self, dut, debug=False):
26 self.dut = dut
27
28 # setting up the input driver and connecting it to signals beginning with "RX"
29 self.stream_in = MVBDriver(dut, "RX", dut.CLK)
30
31 # setting up the output monitor and connecting it to signals beginning with "TX"
32 self.stream_out = MVBMonitor(dut, "TX", dut.CLK, tr_type=MvbTrClassic)
33
34 # setting up driver of the DST_RDY so it randomly fluctuates between 0 and 1
35 self.backpressure = BitDriver(dut.TX_DST_RDY, dut.CLK)
36
37 # setting up the probe measuring throughput
38 self.throughput_probe = ThroughputProbe(ThroughputProbeMvbInterface(self.stream_out), throughput_units="items")
39 self.throughput_probe.set_log_period(10)
40 self.throughput_probe.add_log_interval(0, None)
41
42 # counter of sent transactions
43 self.pkts_sent = 0
44
45 # list of the transactions that are expected to be received
46 self.expected_output = []
47
48 # setting up a scoreboard which compares received transactions with the expected transactions
49 self.scoreboard = Scoreboard(dut)
50
51 # linking a monitor with its expected output
52 self.scoreboard.add_interface(self.stream_out, self.expected_output)
53
54 # setting up the logging level
55 if debug:
56 self.stream_in.log.setLevel(cocotb.logging.DEBUG)
57 self.stream_out.log.setLevel(cocotb.logging.DEBUG)
58
59 # method for adding transactions to the expected output
60 def model(self, transaction):
61 """Model the DUT based on the input transaction"""
62 self.expected_output.append(transaction)
63 self.pkts_sent += 1
64
65 # method preforming a hardware reset
66 async def reset(self):
67 self.dut.RESET.value = 1
68 await ClockCycles(self.dut.CLK, 10)
69 self.dut.RESET.value = 0
70 await RisingEdge(self.dut.CLK)
71
72
73# defining a test - functions with "@cocotb.test()" decorator will be automatically found and run
74@cocotb.test()
75async def run_test(dut, pkt_count=10000):
76 # start a clock generator
77 cocotb.start_soon(Clock(dut.CLK, 5, units="ns").start())
78
79 # initialization of the test bench
80 tb = testbench(dut, debug=False)
81
82 # change MVB driver's IdleGenerator to ItemRateLimiter
83 # note: the RateLimiter's rate is affected by backpressure (DST_RDY).
84 # Even though it takes into account cycles with DST_RDY=0, the desired rate might not be achievable.
85 idle_gen_conf = dict(random_idles=True, max_idles=5, zero_idles_chance=50)
86 tb.stream_in.set_idle_generator(ItemRateLimiter(rate_percentage=30, **idle_gen_conf))
87
88 # running simulated reset
89 await tb.reset()
90
91 # starting the BitDriver (randomized DST_RDY)
92 tb.backpressure.start((1, i % 5) for i in itertools.count())
93
94 # dynamically getting the width of the data signal that will be set (useful if the width of the signal may change)
95 data_width = tb.stream_in.item_widths["data"]
96
97 # generating MVB items as random integers between the minimum and maximum unsigned value of the item
98 for transaction in random_integers(0, 2**data_width-1, pkt_count):
99
100 # logging the generated transaction
101 cocotb.log.debug(f"generated transaction: {hex(transaction)}")
102
103 # initializing MVB transaction object
104 mvb_tr = MvbTrClassic()
105
106 # setting data of MVB transaction to the generated integer
107 mvb_tr.data = transaction
108
109 # appending the transaction to tb.expected_output
110 tb.model(mvb_tr)
111
112 # passing the transaction to the driver which then writes it to the bus
113 tb.stream_in.append(mvb_tr)
114
115 last_num = 0
116
117 # checking if all the expected packets have been received
118 while (tb.stream_out.item_cnt < pkt_count):
119
120 # logging number of received packets after every 1000 packets
121 if (tb.stream_out.item_cnt // 1000) > last_num:
122 last_num = tb.stream_out.item_cnt // 1000
123 cocotb.log.info(f"Number of transactions processed: {tb.stream_out.item_cnt}/{pkt_count}")
124
125 # if not all packets have been received yet, waiting 100 cycles so the simulation doesn't stop prematurely
126 await ClockCycles(dut.CLK, 100)
127
128 # logging values measured by throughput probe
129 tb.throughput_probe.log_max_throughput()
130 tb.throughput_probe.log_average_throughput()
131
132 # displaying result of the test
133 raise tb.scoreboard.result
This test can be used as a template for tests of basic flow and storage components, and can be easily adapted for most other verifications.
It consists of two basic parts: the testbench class and the test itself.
The testbench is the more reusable of the two and usually looks basically the same, so it can be copied and adapted. Its purpose is to initialize and encapsulate objects that drive the test. It sets up drivers, monitors, a scoreboard, expected outputs, and other optional objects, such as a bit driver for ready signals, adds probes, and so on. It also includes a simulated reset.
The second part is the test part. It can consist of one test (typical for simple components) or multiple tests
(more common for larger designs, such as the whole firmware of a card). Every test must have the @cocotb.test()
decorator and be async
.
A test begins with the clock starting, testbench initialization, and a reset.
After the reset, a bit driver is started to test the component’s reaction to backpressure (dst_rdy).
Random data is then generated, which can either be done
using random_transactions
or random data that is then inserted into transaction objects (this is the
case in the test above). The generated transaction is then passed to the model
method of the testbench,
which inserts it into the expected_output
list. The generated transaction is also inserted into
the driver’s send queue using the append
method, from where it is then written onto the bus.
The data is then read from the bus by a monitor, which should pack it into a transaction of the same type as was modeled and pass it to the test’s scoreboard via a callback. The scoreboard pops the transaction from the front of the expected output queue that the monitor is connected to and compares this transaction with the transaction it received from the monitor. If they are not the same, a test failure is raised.
A waiting loop is implemented to ensure that the test doesn’t report scoreboard results prematurely before all the transactions have been received. Otherwise, the scoreboard may receive a different number of transactions than it expected, which will lead to an error.
After all the packets are received, tb.scoreboard.result
is raised, and the test results are shown.
Running the Test
To successfully build and run the simulation, it’s necessary to implement a couple more files.
Examples of these can again be found in the ndk-fpga/comp/mvb_tools/storage/fifox/cocotb/
folder.
First, it is necessary to implement a pyproject.toml
with all test dependencies listed:
1[project]
2name = "cocotb-hash-table-simple-test"
3version = "0.1.0"
4dependencies = [
5 "cocotbext-ofm @ ${NDK_FPGA_COCOTBEXT_OFM_URL}",
6 "setuptools",
7]
8
9[build-system]
10requires = ["pdm-backend"]
11build-backend = "pdm.backend"
Then, a prepare.sh
script is required. This script should create a Python virtual environment and use
the pyproject.toml
file created in the previous step to install all the dependencies into the environment.
It usually looks something like this:
1#!/bin/sh
2# SPDX-License-Identifier: BSD-3-Clause
3# Copyright (C) 2025 CESNET z. s. p. o.
4# Author(s): Ondřej Schwarz <ondrejschwarz@cesnet.cz>
5
6NDK_FPGA_PATH=../../../../..
7source $NDK_FPGA_PATH/env.sh
8
9ndk_fpga_venv_prepare "venv-fifox"
10
11pip install .
12
13echo ""
14echo "Now activate environment with:"
15echo "source venv-fifox/bin/activate"
Use a special cocotb_test_sig.fdo
file to define the signals that will be displayed in the simulator’s waveform.
1# coctb_test_sig.fdo : Include file with signals
2# Copyright (C) 2024 CESNET z. s. p. o.
3# Author(s): Jakub Cabal <cabal@cesnet.cz>
4#
5# SPDX-License-Identifier: BSD-3-Clause
6
7view wave
8delete wave *
9
10add_wave -group {ALL} -noupdate -hex /mvb_fifox/*
11
12config wave -signalnamewidth 1
Finally, create a Makefile
that will run the simulation:
1# Makefile: Makefile to compile module
2# Copyright (C) 2024 CESNET z. s. p. o.
3# Author(s): Ondřej Schwarz <Ondrej.Schwarz@cesnet.cz>
4#
5# SPDX-License-Identifier: BSD-3-Clause
6
7TOP_LEVEL_ENT=mvb_fifox
8TARGET=cocotb
9
10.PHONY: all
11all: comp
12
13include ../../../../../build/Makefile
Note
Don’t forget to adjust the values that are component-specific and the relative paths if needed.
You can run the simulation by executing the prepare.sh
script, entering the created virtual environment,
and running the Makefile
. All of this can be achieved with this one-liner:
. ./prepare && make