• Specification
    • 1. Introduction
    • 2. Basic concepts
    • 3. Data structure description language
    • 4. CAN bus transport layer
    • 5. Application level conventions
    • 6. Application level functions
    • 7. List of standard data types
    • 8. Hardware design recommendations
  • Implementations
    • Libuavcan
      • Platforms
      • Tutorials
        • 1. Library build configuration
        • 2. Node initialization and startup
        • 3. Publishers and subscribers
        • 4. Services
        • 5. Timers
        • 6. Time synchronization
        • 7. Remote node reconfiguration
        • 8. Custom data types
        • 9. Node discovery
        • 10. Dynamic node ID allocation
        • 11. Firmware update
        • 12. Multithreading
        • 13. CAN acceptance filters
      • FAQ
    • Pyuavcan
      • Examples
        • Automated ESC enumeration
        • ESC throttle control
      • Tutorials
        • 1. Setup
        • 2. Basic usage
        • 3. Advanced usage
        • 4. Parsing DSDL definitions
    • Libcanard
    • Uavcan.rs
  • GUI Tool
    • Overview
    • Examples
    • User guide
  • Examples
    • Simple sensor node
  • Forum
Implementations /  Pyuavcan /  Tutorials /  2. Basic usage

Basic usage

This tutorial will walk you through the basic usage of Pyuavcan. It is intended for execution in an interactive shell of Python 3.4 or newer. While Pyuavcan supports Python 2.7, its use is strongly discouraged.

It is recommended to keep a running instance of the UAVCAN GUI Tool on the same bus while trying the examples below. If you’re working on Linux, consider using a virtual CAN interface for experimenting.

This guide is not an extensive list of all capabilities of the library. For that, use the Python’s help system (function help()), or read the code.

Node initialization

Start a Python 3 shell and run this:

import uavcan
node = uavcan.make_node('/dev/ttyACM0')

This will create a node connected to the SLCAN interface /dev/ttyACM0, where all options are initialized to default values. The library will select the correct backend automatically by looking at the name of the interface. That is, if the interface is named like COM3 or like above, the library will choose the SLCAN backend; if the interface is named in the form of can0, the library will select SocketCAN.

A more useful node instantiation example is shown below:

import uavcan

# Instantiating an instance of standard service type uavcan.protocol.GetNodeInfo
# Here we need the response data structure, which is why we use the factory Response()
node_info = uavcan.protocol.GetNodeInfo.Response()
node_info.name = 'org.uavcan.pyuavcan_demo'
node_info.software_version.major = 1
node_info.hardware_version.unique_id = b'12345' # Setting first 5 bytes; rest will be kept zero
# Fill other fields as necessary...

node = uavcan.make_node('vcan0',
                        node_id=123,          # Setting the node ID 123
                        node_info=node_info)  # Setting node info

# Alternatively, node ID can be set after initialization,
# but only if it hasn't been set before:
node.node_id = 123

While the node is running, it is possible to change its mode, health, and vendor specific status code as follows:

node.mode = uavcan.protocol.NodeStatus().MODE_OPERATIONAL
node.health = uavcan.protocol.NodeStatus().HEALTH_WARNING
node.vendor_specific_status_code = 12345

The library expects that it nearly always has a thread of execution blocked inside the spin() method of the node class. It is allowed to deviate from this requirement if the application does not have strict real-time requirements by temporarily dispatching control to other tasks, or by invoking the spin method periodically (at least 100 times per second) in a non-blocking way with zero timeout. Thus, the application should always keep one thread in the node’s spin() function.

while True:
    try:
        node.spin()             # Spin forever or until an exception is thrown
    except UAVCANException as ex:
        print('Node error:', ex)

Another popular use case:

while True:
    try:
        node.spin(1)            # Spin for 1 second or until an exception is thrown
        perform_other_tasks()
        # time.sleep(0.1)       # Never do this!
    except UAVCANException as ex:
        print('Node error:', ex)

In general, if the application needs to perform other blocking tasks, a separate thread should be spawned. In the next section we’ll learn how to run non-blocking parallel tasks from the node thread by using the callback scheduler embedded in the node.

When you’re finished with the node, don’t forget to call close() on it; better yet, use contextlib:

from contextlib import closing
with closing(uavcan.driver.make_node('/dev/ttyACM0', bitrate=1000000)) as node:
    # Do some work here...
# When control reaches the end of the with block, the node will be properly finalized

Invoking callbacks from the node thread

In order to keep the node running in the background while having the shell available for input, let’s spawn a thread and dispatch it into the spin() method. This approach works fine for experimenting, however, it must never be used in production because the library is not thread safe.

import threading
threading.Thread(target=node.spin, daemon=True).start()

It is recommended to launch the UAVCAN GUI tool now to make sure that the node can be seen online.

Now we’ll see how to invoke callbacks once or periodically from the node thread:

def periodic_callback_1hz():
    print('Periodic')

def deferred_call():
    print('Deferred call')

# Invoking a function every second.
periodic_handle = node.periodic(1, periodic_callback_1hz)

# Invoking a function once after a 0.5 second delay expires.
deferred_handle = node.defer(0.5, deferred_call)

# When we no longer want a callback to fire, the associated task can be removed.
periodic_handle.remove()

# Deferred calls get removed automatically once they were invoked.
# Removing a deferred call before its invocation can be used to cancel
# pending deferred calls if they are no longer needed.
deferred_handle.remove()

Note that all exceptions thrown from all kinds of callbacks from the node are propagated through and thrown from the spin() method. Make sure to catch them.

Loading DSDL definitions

The library automatically loads definitions of all standard DSDL definitions (from the namespace uavcan.*) immediately once the module is imported. Additional definitions, such as vendor-specific definitions, can be added later by invoking uavcan.load_dsdl() with a list of lookup paths provided in the first argument. Refer to the function documentation for more info.

Standard definitions can be reached directly from the module namespace, e.g. uavcan.protocol.NodeStatus. Vendor-specific definitions can be accessed via the pseudo-module uavcan.thirdparty. Both standard and vendor-specific types are also accessible via the global dictionaries uavcan.DATATYPES and uavcan.TYPENAMES.

When instantiating a message type, its type name should be treated as a class name, e.g. msg = uavcan.protocol.NodeStatus(). When instantiating a service request or response structure, its type name need to be appended with either Request() or Response() specifier, e.g. uavcan.protocol.GetNodeInfo.Request(), uavcan.protocol.GetNodeInfo.Response(). Direct instantiation of services is not defined by the specification, and will lead to a runtime error.

Receiving messages from the bus

def node_status_callback(event):
    print('NodeStatus message from node', event.transfer.source_node_id)
    print('Node uptime:', event.message.uptime_sec, 'seconds')
    # Messages, service requests, service responses, and entire events
    # can be converted into YAML formatted data structure using to_yaml():
    print(uavcan.to_yaml(event))

# Subscribing to messages uavcan.protocol.NodeStatus
handle = node.add_handler(uavcan.protocol.NodeStatus, node_status_callback)

# When we no longer need them, we can remove the handler
handle.remove()

Timestamping

It should be noted that Pyuavcan goes at great lengths to obtain precise timestamps of the received CAN frames, and, by extension, UAVCAN transfers (both messages and services). The exact method of timestamp estimation depends on the backend, so the most valid information can be obtained by looking at the source code of the backend of interest.

For SLCAN, if the CAN adapter provides timestamps, the library will convert them from the adapter’s clock domain into the monotonic and real time domains of the local computer using the Olson passive time synchronization algorithm. If timestamps are not provided by the adapter, the library will use naive timestamping, which is imprecise.

For SocketCAN, the library retrieves the timestamping information from the Linux kernel. Refer to the code for more information.

The computed precise timestamps will be stored in the fields event.transfer.ts_monotonic and event.transfer.ts_real for time in the local monotonic (see time.monotonic()) and real (see time.time()) domains, respectively.

Publishing messages to the bus

Pyuavcan supports different of ways to assign values to fields of DSDL data structures. Using the type constructors:

  • uavcan.protocol.debug.KeyValue(key='this is key', value=123.456)
  • uavcan.protocol.param.GetSet.Request(name='foobar')
  • uavcan.protocol.param.GetSet.Response(value=uavcan.protocol.param.Value(integer_value=123))

Values that are not specified in the constructor are zero initialized by default.

Using direct assignment:

s = uavcan.protocol.debug.KeyValue()
s.key = 'this is key'
s.value = 123.456

Messages can be broadcast using the method broadcast():

def do_publish():
    msg = uavcan.protocol.debug.KeyValue(key='this is key', value=123.456)
    # The priority argument can be omitted, in which case default will be used
    node.broadcast(msg, priority=uavcan.TRANSFER_PRIORITY_LOWEST)

handle = node.periodic(1, do_publish)

Invoking services

# The callback will be invoked ALWAYS, even if the request has timed out.
# In the case of a timeout, the argument will be None.
def response_callback(event):
    if event:
        print('Service response from server', event.transfer.source_node_id)
        print(uavcan.to_yaml(event.response))
    else:
        print('SERVICE REQUEST HAS TIMED OUT')

# The priority argument can be omitted, in which case default will be used
node.request(uavcan.protocol.GetNodeInfo.Request(), 127, response_callback,
             priority=uavcan.TRANSFER_PRIORITY_LOWEST)
node.request(uavcan.protocol.GetNodeInfo.Request(), 126, response_callback)

Responding to services

def handle_node_restart_request(event):
    print('Request from client', event.transfer.source_node_id)
    # Rejecting the request if the magic number is incorrect
    if event.request.magic_number != event.request.MAGIC_NUMBER:
        return uavcan.protocol.RestartNode.Response(ok=False)
    print('Restart request accepted')
    # Ha ha, just kidding; actual reboot in a demo application is hardly a good idea
    import os
    print('Reboot result:', os.system('reboot'))
    # Returning confirmation
    return uavcan.protocol.RestartNode.Response(ok=True)

node.add_handler(uavcan.protocol.RestartNode, handle_node_restart_request)

Using Vendor-Specific DSDL Definitions

Pyuavcan will automatically scan the directory ~/uavcan_vendor_specific_types for vendor-specific data types, where ~ stands for the home directory of the current user, e.g. /home/joe or C:\Users\Joe. Consider the following directory layout:

~
└── uavcan_vendor_specific_types
    └── sirius_cybernetics_corporation
        ├── 100.Foo.uavcan
        ├── 42.Bar.uavcan
        └── Baz.uavcan

The above layout defines the following custom data types:

  • sirius_cybernetics_corporation.Foo with the default data type ID 100.
  • sirius_cybernetics_corporation.Bar with the default data type ID 42.
  • sirius_cybernetics_corporation.Baz where the default data type ID is not set.

Obsolete

This website is dedicated to the experimental version of the protocol known as UAVCAN v0 that is now obsolete. To learn more about the stable release UAVCAN v1.0, please visit the main website.

Pyuavcan

  • Examples
    • Automated ESC enumeration
    • ESC throttle control
  • Tutorials
    • 1. Setup
    • 2. Basic usage
    • 3. Advanced usage
    • 4. Parsing DSDL definitions

License

This work is licensed under a Creative Commons Attribution 4.0 International License.
  • Discussion forum
  • GitHub organization
  • Report a problem with this website

Generated Thu, 17 Feb 2022 16:27:38 +0000 © UAVCAN development team