Sorted reading on multiple inputs by order of timestamps

This tutorial explains how to read data sorted by order of timestamps.

When implementing and executing data fusion algorithms in a multithreaded or even distributed environment, one of the difficulties is to maintain time coherency and make sure the data fusion will operate on data according to their measurement dates and not their dates of availability at the fusion stage (as the different streams may have been subject to different latencies due to pre-processing, communication, etc.)

Several strategies exist in RTMaps to read data from multiple inputs. The four basic strategies are the following:

  •  Asynchronous or Reactive: the component reacts every time new data arrives on any input;
  • Triggered: the component reacts every time new data arrives on the first input, then re-samples available data on the other inputs
  • Resampling: the component executes periodically and samples its input data;
  • Resynchronizer: the component executes every time data samples with equal (or close) timestamps are available on all inputs.

This tutorial deals with an additional, advanced strategy that sorts samples by order of timestamps under constraint of maximum latency (and hence preserves time coherency though operating in a multi-thread, multi-process or even distributed architecture). Here is how to create a component using this strategy with the Windows and Linux wizards.

Creating the component

Windows

Start the component creation wizard by clicking Tools > RTMaps 4 > New RTMaps Module. In the wizard window that pops up, enter your component's name and select "Inherits from:" SortedStartReadingComponent Template. Click OK to create the component.

Linux

Open a terminal and call the wizard with the following command: rtmaps_sdk_wizard -c PackageName ComponentName. The wizard will ask you the type of component to create. Enter the number corresponding to the "Sorted Reader based component" and hit enter.

 Implementing the component

 The Sorted Reader works with a superclass called MAPSSortedStartReadingComponent. This class implements the sorting and your component inherits from it.

Your newly created component has the usual Birth, Core and Death methods. Note that the superclass's implementation is called in these callbacks. But you will also notice a new method in your component: HandleNewElement. This method is an additional callback that will be called each time a data sample has been sorted by the superclass. This callback is where you should program your component's logic.

 The principle of the sorting is that the superclass performs a Reactive reading to get all incoming data. It assumes that timestamps on any given input are increasing. When the component receives a sample s, it waits to receive data of greater timestamp on the other inputs. This validates sample s and HandleNewElement is then called on this sample.

The component has a max_latency property. This is the maximum latency that you accept for a data sample. When a data sample's latency exceeds this maximum, HandleNewElement is called with the is_late parameter set to true.

When the current sample could not be sorted in the order of timestamps, HandleNewElement is called with the is_unsorted parameter set to true, and the unsorted_samples_count output is incremented. This can happen when the sample arrived too late and its timestamp is older than the ones of other already processed samples. Many unsorted samples can mean that the latency parameter is too small.

 

MAPSRBRegion

MAPSRBRegion allows to manipulate raw data as a circular buffer. It is a very optimized and powerful tool, but not obvious to use for new users !

Introduction

With MAPSRBRegion, the data and the ring buffer associated to it are completely decoupled. MAPSRBRegion is like an accessor to the raw data.

A region is built around the following variables :

  • offset : define the start of the region in the ring-buffer.
  • size : define the size of the region. size=size0+size1
  • size0 : define the size of the region up to the end of the ring-buffer.
  • size1 : define the size of the region from the beginning of the ring-buffer.
  • fifoSize : define the size of the ring-buffer.

Here is an illustrated example of a raw data (in blue) and its associated MAPSRBRegion.

Add data to the ring buffer

In order to add fresh data to the ring buffer, you have to Push data to a new ring buffer first. This will create a new MAPSRBRegion (represented in red in the next figure).

Let's name fifo (hatched lines) the main MAPSRBRegion that hold the whole data and pushedReg the new one (red).

Here is what the code would look like:

// Start reading input
MAPSIOElt *ioElt=StartReading(Input("iStream8"));
// Create a new ring buffer that will hold the freshly read data
MAPSRBRegion pushedReg;
// Push : Just create the room before copying data.
MAPSConstRBRegionState st=fifo.Push(ioElt->VectorSize(),pushedReg);
// Copy to the data
pushedReg.CopyToRB(data,ioElt->Stream8(),ioElt->VectorSize());

The Push function called from the main RBRegion (fifo) just reserves room by creating a new RBRegion (pushedReg) that fits what we asked in terms of size and position on the main RBRegion. As a result, pushedReg is created as represented in the previous figure, i.e in red.

Using this RBRegion, we can now copy the MAPSIOElt to the raw buffer using CopyToRB. This will copy data where the RBRegion is (red zone).

Remove data from the ring buffer

Removing data from a ring buffer is quite simple, we just have to use the Pop function :

fifo.Pop(number_of_objects_to_remove);

Removing two objects would remove the two green elements in the next figure:

The first two elements are removed, modifying the offset (and the size0 here).

Copy (a part of) the ring buffer to a temporary linear data

Suppose we want to copy the current ring buffer status (named fifo, see previous sections) in a fresh linear data, for further processing.

int raw_buffer[fifo.Size()];
fifo.CopyFromRB(data, raw_buffer, fifo.Size() );

That's all!

Access to the ring buffer

As seen in the previous section, we can copy the ring buffer in a linear raw buffer. But we can also access to any element via the operator[]. Indeed, the operator[] returns the real index of the data, based on the RBRegion's offset.

fifo[offset] returns the real index, so that :

int d = data[fifo[offset]];

returns the corresponding data.

Example : Use the MAPSStream8IOComponent to read data from the serial port

To read data coming from the serial port, RTMaps helps a lot by providing a ready to use parent class : MAPSStream8IOComponent ! You have to inherit from it (you can select it in the wizard) and implement the NewDataCallback virtual member function. Here is the prototype:

virtual void MAPSStream8IOComponent::NewDataCallback(
    MAPSRBRegion &region,
    const unsigned char *data, 
    const MAPSTimestamp *timestamp, 
    const MAPSTimestamp *timeOfIssue);

In this callback, there is the RBRegion, the data, the timestamps and the time of issues. The same RBRegion is used to access all those data : there are at the same places ! For examples, data[region[0]] returns the first data and timestamp[region[0]] its associated timestamp.

Here is some classic code you may have to write inside the NewDataCallback :

void MyComponent::NewDataCallback(MAPSRBRegion &region,const unsigned char *data, const MAPSTimestamp *timestamp, const MAPSTimestamp *timeOfIssue)
{
    int offset=0; // current offset
    int offsetStart=0; // the beginning of the message
    int lastOffset = 0; // last offset for a message
    int regionSize = region.Size();
    while(offset<regionSize) // walk through the 
    {
        c = data[region[offset]]; // current value
        // your code : update lastOffset, OffsetStart, processing, and so on....
        MAPSRBRegion finalSubRegion;
        int res = region.SubRegion(finalSubRegion,MAPSConstRBRegionSubRegionFromBeginning, offsetStart, offset-offsetStart);
        finalSubRegion.CopyFromRB(data, m_currentPacket, finalSubRegion.Size());
    }
    region.Pop(lastOffset);
}

Well, there is nothing really new here ! We loop over the message, accessing it with the offset variable. Then we do some tests and processing (not showed here). When the message is complete, we extract the part of the message that is useful thanks to SubRegion, and we copy it to a linear raw buffer (m_currentPacket, which is a member variable). Once the processing is finished, we can Pop the part of the message that has already been read. And that's all!

 

 

MAPSComponent::IsStarted and MAPS::IsRunning

MAPSComponent::IsStarted tells if a particular component is started while MAPS::IsRunning tells if the global diagram is in running state.

When you start a diagram, the RTMaps engine goes in run mode, so that MAPS::IsRunning returns true.

Then, each component on the diagram is also set in run mode, and Birth is called. It means that IsStarted returns true before the component enters the Birth member function ! IsStarted allows to make the distinction between the design mode and the execution mode of a particular component.

How to manage output buffers manually?

You don't always know the size of a vector at design time. In that case, you need to do the allocation of RTMaps buffers yourself. This article explains how to do it.

For the sake of simplicity, we use the ready-to-use example from the RTMaps SDK (Chapter 1 - maps_multiplier_4.cpp).

Declaration of the input

The MAPS_OUTPUT (see RTMaps macros : inputs, outputs and properties (part I) is set with a zero size. This prevents RTMaps from allocating the output buffer. Indeed, the allocation will be done later manually!

MAPS_BEGIN_OUTPUTS_DEFINITION(MAPSmultiplier4)
    MAPS_OUTPUT("output",MAPS::Integer32,NULL,NULL,0)
MAPS_END_OUTPUTS_DEFINITION

Allocation of the buffer

The buffer allocation is done when we receive the first sample from the upstream component. At that time, we know everything about our input : VectorSize, BufferSize, etc. Here we are particularly interested in the BufferSize as it is the maximum size of the input data.

void MAPSmultiplier4::Core() 
{
    MAPSIOElt* ioEltIn = StartReading(Input(0)); // Read the input
    if (ioEltIn == NULL)
        return;
    if (m_firstTime) // First time we pass into Core
    {
        m_firstTime = false; // We won't pass anymore
        Output(0).AllocOutputBuffer(ioEltIn->BufferSize()); // Allocate the buffer
    }
    // Other code
}

m_first_time is just a boolean (initialized to true) used to do the allocation only once!

How to create a "parent" component?

Sometimes you may want to factor common behavior of many classes in a common parent class. This is one of the strength of object-oriented programming. This article shows how to do it in RTMaps.

Header code

The header code of a parent component must contain one of these two special macros:

  • MAPS_PARENT_COMPONENT_HEADER_CODE
  • MAPS_PARENT_COMPONENT_HEADER_CODE_WITHOUT_CONSTRUCTOR

Here is a basic example:

class MAPSParentComponent : public MAPSComponent
{
	MAPS_PARENT_COMPONENT_HEADER_CODE(MAPSParentComponent ,MAPSComponent)
    // Other code
};

Source code

In the source code, the Dynamic function must be implemented, like this:

void MAPSParentComponent::Dynamic()
{
	// Static inputs, outputs, properties and actions instantiation for the parent
	// This concerns only
	int nbInputs=0,nbOutputs=0,nbProperties=0,nbActions=0;
	while(1)
	{
		if (inputsDefinition[nbInputs].name!=NULL)
			NewInput(inputsDefinition[nbInputs++]);
		else
			break;
	};
	while(1)
	{
		if (outputsDefinition[nbOutputs].name!=NULL)
			NewOutput(outputsDefinition[nbOutputs++]);
		else
			break;
	};
	while(1)
	{
		if (propertiesDefinition[nbProperties].name!=NULL)
			MAPSModule::NewProperty(propertiesDefinition[nbProperties++]);
		else
			break;
	};
	while(1)
	{
		if (actionsDefinition[nbActions].name!=NULL)
			MAPSModule::NewAction(actionsDefinition[nbActions++]);
		else
			break;
	};
}

This code simply recreates inputs, outputs, properties and actions. You don't have to fully understand what's going on here. Just copy/paste that code and bob's your uncle!