• Specification
    • 1. Introduction
    • 2. Basic concepts
    • 3. Data structure description language
    • 4. CAN bus transport layer
    • 5. Application level conventions
    • 6. Application level functions
    • 7. List of standard data types
    • 8. Hardware design recommendations
  • Implementations
    • Libuavcan
      • Platforms
      • Tutorials
        • 1. Library build configuration
        • 2. Node initialization and startup
        • 3. Publishers and subscribers
        • 4. Services
        • 5. Timers
        • 6. Time synchronization
        • 7. Remote node reconfiguration
        • 8. Custom data types
        • 9. Node discovery
        • 10. Dynamic node ID allocation
        • 11. Firmware update
        • 12. Multithreading
        • 13. CAN acceptance filters
      • FAQ
    • Pyuavcan
      • Examples
        • Automated ESC enumeration
        • ESC throttle control
      • Tutorials
        • 1. Setup
        • 2. Basic usage
        • 3. Advanced usage
        • 4. Parsing DSDL definitions
    • Libcanard
    • Uavcan.rs
  • GUI Tool
    • Overview
    • Examples
    • User guide
  • Examples
    • Simple sensor node
  • Forum
Implementations /  Pyuavcan /  Examples /  Automated ESC enumeration

Automated ESC enumeration

In this example we’ll demonstrate how the automated ESC enumeration feature works from the inside. Please read the UAVCAN specification if the concept of automated enumeration is not familiar to you.

Writing the script

#!/usr/bin/env python3

import uavcan, time


# Waiting until new nodes stop appearing online.
# That would mean that all nodes that are connected to the bus are now online and ready to work.
def wait_for_all_nodes_to_become_online():
    num_nodes = 0
    while True:
        node.spin(timeout=10)
        new_num_nodes = len(dynamic_node_id_allocator.get_allocation_table())
        if new_num_nodes == num_nodes and num_nodes > 1:
            break

        num_nodes = new_num_nodes


# Determining how many ESC nodes are present.
# In real use cases though the number of ESC should be obtained from elsewhere, e.g. from control mixer settings.
# There is a helper class in PyUAVCAN that allows one to automate what we're doing here,
# but we're not using it for the purposes of greater clarity of what's going on on the protocol level.
def detect_esc_nodes():
    esc_nodes = set()
    handle = node.add_handler(uavcan.equipment.esc.Status, lambda event: esc_nodes.add(event.transfer.source_node_id))
    try:
        node.spin(timeout=3)            # Collecting ESC status messages, thus determining which nodes are ESC
    finally:
        handle.remove()

    return esc_nodes


# Enumerating ESC.
# In this example we're using blocking code for simplicity reasons,
# but real applications will most likely resort either to asynchronous code (callback-based),
# or implement the logic in a dedicated thread.
# Conversion of the code from synchronous to asynchronous/multithreaded pertains to the domain of general
# programming issues, so these questions are not covered in this demo.
def enumerate_all_esc(esc_nodes, timeout=60):
    begin_responses_succeeded = 0

    def begin_response_checker(event):
        nonlocal begin_responses_succeeded
        if not event:
            raise Exception('Request timed out')

        if event.response.error != event.response.ERROR_OK:
            raise Exception('Enumeration rejected\n' + uavcan.to_yaml(event))

        begin_responses_succeeded += 1

    overall_deadline = time.monotonic() + timeout

    print('Starting enumeration on all nodes...')
    begin_request = uavcan.protocol.enumeration.Begin.Request(timeout_sec=timeout)
    for node_id in esc_nodes:
        print('Sending enumeration begin request to', node_id)
        node.request(begin_request, node_id, begin_response_checker)

    while begin_responses_succeeded < len(esc_nodes):
        node.spin(0.1)

    print('Listening for indications...')
    enumerated_nodes = []
    next_index = 0
    while set(enumerated_nodes) != esc_nodes:
        received_indication = None

        def indication_callback(event):
            nonlocal received_indication
            if event.transfer.source_node_id in enumerated_nodes:
                print('Indication callback from node %d ignored - already enumerated' % event.transfer.source_node_id)
            else:
                print(uavcan.to_yaml(event))
                received_indication = event

        indication_handler = node.add_handler(uavcan.protocol.enumeration.Indication, indication_callback)
        print('=== PROVIDE ENUMERATION FEEDBACK ON ESC INDEX %d NOW ===' % next_index)
        print('=== e.g. turn the motor, press the button, etc, depending on your equipment ===')
        try:
            while received_indication is None:
                node.spin(0.1)
                if time.monotonic() > overall_deadline:
                    raise Exception('Process timed out')
        finally:
            indication_handler.remove()

        target_node_id = received_indication.transfer.source_node_id
        print('Indication received from node', target_node_id)

        print('Stopping enumeration on node', target_node_id)
        begin_responses_succeeded = 0
        node.request(uavcan.protocol.enumeration.Begin.Request(), target_node_id, begin_response_checker)
        while begin_responses_succeeded < 1:
            node.spin(0.1)

        print('Setting config param %r to %r...' % (received_indication.message.parameter_name.decode(), next_index))
        configuration_finished = False

        def param_set_response(event):
            if not event:
                raise Exception('Request timed out')

            assert event.response.name == received_indication.message.parameter_name
            assert event.response.value.integer_value == next_index
            print(uavcan.to_yaml(event))
            node.request(uavcan.protocol.param.ExecuteOpcode.Request(
                             opcode=uavcan.protocol.param.ExecuteOpcode.Request().OPCODE_SAVE),
                         target_node_id,
                         param_opcode_response)

        def param_opcode_response(event):
            nonlocal configuration_finished
            if not event:
                raise Exception('Request timed out')

            print(uavcan.to_yaml(event))
            if not event.response.ok:
                raise Exception('Param opcode execution rejected\n' + uavcan.to_yaml(event))
            else:
                configuration_finished = True

        node.request(uavcan.protocol.param.GetSet.Request(value=uavcan.protocol.param.Value(integer_value=next_index),
                                                          name=received_indication.message.parameter_name),
                     target_node_id,
                     param_set_response)

        while not configuration_finished:
            node.spin(0.1)

        print('Node', target_node_id, 'assigned ESC index', next_index)
        next_index += 1
        enumerated_nodes.append(target_node_id)
        print('Enumerated so far:', enumerated_nodes)

    return enumerated_nodes


if __name__ == '__main__':
    # Initializing a UAVCAN node instance.
    # In this example we're using an SLCAN adapter on the port '/dev/ttyACM0'.
    # PyUAVCAN also supports other types of adapters, refer to its docs to learn more.
    node = uavcan.make_node('/dev/ttyACM0', node_id=10, bitrate=1000000)

    # Initializing a dynamic node ID allocator.
    # This would not be necessary if the nodes were configured to use static node ID.
    node_monitor = uavcan.app.node_monitor.NodeMonitor(node)
    dynamic_node_id_allocator = uavcan.app.dynamic_node_id.CentralizedServer(node, node_monitor)

    print('Waiting for all nodes to appear online, this should take less than a minute...')
    wait_for_all_nodes_to_become_online()
    print('Online nodes:', [node_id for _, node_id in dynamic_node_id_allocator.get_allocation_table()])

    print('Detecting ESC nodes...')
    esc_nodes = detect_esc_nodes()
    print('ESC nodes:', esc_nodes)

    enumerated_esc = enumerate_all_esc(esc_nodes)
    print('All ESC enumerated successfully; index order is as follows:', enumerated_esc)

Running the code

Connect ESC to the CAN bus (it’s better to use multiple ESC, otherwise the auto-enumeration procedure becomes rather pointless), start the script, and follow its instructions.

For each ESC you will see the output similar to this:

=== PROVIDE ENUMERATION FEEDBACK ON ESC INDEX 0 NOW ===
=== e.g. turn the motor, press the button, etc, depending on your equipment ===
### Message from 124 to All  ts_mono=94836.894482  ts_real=1470647890.850445
value:
  empty:
    {}
parameter_name: 'esc_index' # [101, 115, 99, 95, 105, 110, 100, 101, 120]
Indication received from node 124
Stopping enumeration on node 124
Setting config param 'esc_index' to 0...
### Response from 124 to 10  ts_mono=94837.077415  ts_real=1470647891.033378
value:
  integer_value: 0
default_value:
  integer_value: 0
max_value:
  integer_value: 15
min_value:
  integer_value: 0
name: 'esc_index' # [101, 115, 99, 95, 105, 110, 100, 101, 120]
### Response from 124 to 10  ts_mono=94837.122424  ts_real=1470647891.078387
argument: 0
ok: true
Node 124 assigned ESC index 0
Enumerated so far: [124]

A relevant video

The following video demonstrates how the automated enumeration works on the user level. Although it was not performed with the script shown in this example, the core principles remain the same.

Obsolete

This website is dedicated to the experimental version of the protocol known as UAVCAN v0 that is now obsolete. To learn more about the stable release UAVCAN v1.0, please visit the main website.

Pyuavcan

  • Examples
    • Automated ESC enumeration
    • ESC throttle control
  • Tutorials
    • 1. Setup
    • 2. Basic usage
    • 3. Advanced usage
    • 4. Parsing DSDL definitions

License

This work is licensed under a Creative Commons Attribution 4.0 International License.
  • Discussion forum
  • GitHub organization
  • Report a problem with this website

Generated Thu, 17 Feb 2022 16:27:38 +0000 © UAVCAN development team