CSV Message Parser

Data Source Service CSV Message Parser Documentation

1. Platform and stream naming configuration capabilities

This Data Parser allows you to subscribe to an MQTT topic and ingest lines of text formatted as CSV.

  • All Platform and Stream identifiers will be prefixed with dsapi-<friendly_id>-, where the friendly_id is generated by the Data Source system.
  • The Platform name will be the same as the Platform identifier but without the prefix dsapi-<friendly_id>-
  • The platform and stream identifiers generated are configurable and can be constructed to use tokens of the topic name and static content.
  • Stream identifiers will be prefixed with the platform identifier, given a configurable infix and given the configured header label as a suffix.
  • This allows a single MQTT connection to ingest data streams for multiple platforms using a flexible naming convention.

2. Tokenising a topic name

When configuring the parser to use topic tokens in the platform and stream names, it's important to understand how the topic is tokenised:

  • First, all "/" characters in the topic are converted to ".".
  • The topic name is then tokenised with "." as the delimiter.
  • Tokens are indexed from 1 so the first token, reading from left to right, can be inserted into a platform or stream id with the text "[1]"
  • To insert tokens indexed from right to left, use a negative sign. Eg last token can be inserted into a platform or stream id with the text "[-1]"
  • Using the index "[0]" will insert the "." delimited version of the topic.

3. Parser Configuration

Default naming convention assumes the last 3 "." separated tokens of the topic are in the form howeverplatform_name/sensor_type/message_type. Custom configuration of platform_id_suffix_pattern, topic_to_message_type_pattern and stream_id_suffix_pattern parameter can handle different naming conventions.

topic_to_platform_id_suffix_pattern (Optional)

Default is "csv.[-2]" which inserts the third from last token in the topic.

E.g. The topic name application/test/platform_name/sensor_type/message_type would result in the platform_id dsapi-<kodenamize_id>-csv.platform_name

topic_to_message_type_pattern (Optional)

Default is "[-1]" which inserts the second from last and last token in the topic.

E.g. The topic name application/test/platform_name/sensor_type/message_type would match the topic_configuration with message_type of sensor_type.message_type

The result is used to match with the <<message_type_id>> keys in csv_message_types

csv_message_types

A map where <<message_type_id>> is the key for the message definition.

MESSAGE_TYPE_ID

  • time_zone (Optional): A joda timezone id. Default is UTC. E.g. "Australia/Hobart", "UTC", "+1000"
  • times_format (Optional): A joda datetime format used to parse the datetime column value. Default is ISO 8601 format.
  • column_delimiter (Optional): Default is ","
  • line_delimiter (Optional): Default is "\n"
  • sample_period (Optional): Typical time duration between samples, formatted as ISO 8601. Default is null
  • reporting_period (Optional): Typical time duration between messages uploading a batch of samples, formatted as ISO 8601. Default is null
  • headers: A list of csv columns

It is assumed that the first column will be the timestamp so explicit configuration of that column isn't required.

4. Simple configuration example

For this configuration example, if we receive valid CSV on the following topics:

  • paddock_south_unit_1/sensor_x
  • paddock_south_unit_2/sensor_x
  • paddock_north_unit_3/sensor_x

Then the following platforms and streams will be created:

  • dsapi-<kodenamize_id>-your_org_and_project_namespace.paddock_south_unit_1
    • dsapi-<kodenamize_id>-your_org_and_project_namespace.paddock_south_unit_1.sensor_x.BattV_volts
    • dsapi-<kodenamize_id>-your_org_and_project_namespace.paddock_south_unit_1.sensor_x.PTemp_C
  • dsapi-<kodenamize_id>-your_org_and_project_namespace.paddock_south_unit_2
    • dsapi-<kodenamize_id>-your_org_and_project_namespace.paddock_south_unit_2.sensor_x.BattV_volts
    • dsapi-<kodenamize_id>-your_org_and_project_namespace.paddock_south_unit_2.sensor_x.PTemp_C
  • dsapi-<kodenamize_id>-your_org_and_project_namespace.paddock_north_unit_3
    • dsapi-<kodenamize_id>-your_org_and_project_namespace.paddock_north_unit_3.sensor_x.BattV_volts
    • dsapi-<kodenamize_id>-your_org_and_project_namespace.paddock_north_unit_3.sensor_x.PTemp_C

Configuration JSON:

{
  "topic_to_platform_id_suffix_pattern": "your_org_and_project_namespace.[-2]",
  "csv_message_types": {
    "sensor_x": {
      "headers": [
        {
          "label": "BattV_volts"
        },
        {
          "label": "PTemp_C"
        }
      ]
    }
  }
}

Command line tests (Bash):

export SENAPS=senaps.eratos.com && export YOUR_USERNAME=<TODO> && export YOUR_PASSWORD=<TODO> && export KODENAMIZE_ID=<TODO>
t=$(date -u +%Y-%m-%d'T'%T);mosquitto_pub -h $SENAPS -p 1883 -u "dsapi-${KODENAMIZE_ID}:${YOUR_USERNAME}" -P "${YOUR_PASSWORD}" -t paddock_south_unit_1/sensor_x -m "${t},20,$((1 + $RANDOM % 10))"

5. Full feature configuration example

For this configuration example, if we receive valid CSV on the following topics:

  • application/paddock_south/unit_1/sensor_x
  • application/paddock_south/unit_1/sensor_y
  • application/paddock_south/unit_2/sensor_x
  • application/paddock_north/unit_3/sensor_x

Then the following platforms and streams will be created:

  • dsapi-<kodenamize_id>-your_org_and_project_namespace.application.paddock_south.unit_1
    • dsapi-<kodenamize_id>-your_org_and_project_namespace.application.paddock_south.unit_1.sensor_x.BattV_volts
    • dsapi-<kodenamize_id>-your_org_and_project_namespace.application.paddock_south.unit_1.sensor_x.PTemp_C
    • dsapi-<kodenamize_id>-your_org_and_project_namespace.application.paddock_south.unit_1.sensor_y.rain_count
  • dsapi-<kodenamize_id>-your_org_and_project_namespace.application.paddock_south.unit_2
    • dsapi-<kodenamize_id>-your_org_and_project_namespace.application.paddock_south.unit_2.sensor_x.BattV_volts
    • dsapi-<kodenamize_id>-your_org_and_project_namespace.application.paddock_south.unit_2.sensor_x.PTemp_C
  • dsapi-<kodenamize_id>-your_org_and_project_namespace.application.paddock_north.unit_3
    • dsapi-<kodenamize_id>-your_org_and_project_namespace.application.paddock_north.unit_3.sensor_x.BattV_volts
    • dsapi-<kodenamize_id>-your_org_and_project_namespace.application.paddock_north.unit_3.sensor_x.PTemp_C

Configuration JSON:

{
  "topic_to_platform_id_suffix_pattern": "your_org_and_project_namespace.[1].[-3].[-2]",
  "topic_to_message_type_pattern": "[-1]",
  "csv_message_types": {
    "sensor_x": {
      "time_zone": "UTC",
      "time_format": "yyyy-MM-dd HH:mm:ss",
      "sample_period": "PT1H",
      "reporting_period": "PT1H",
      "headers": [
        {
          "label": "BattV_volts",
          "stream_metadata": {
            "interpolationType": "http://www.opengis.net/def/waterml/2.0/interpolationType/Continuous",
            "type": ".ScalarStreamMetaData"
          }
        },
        {
          "label": "PTemp_C",
          "stream_metadata": {
            "interpolationType": "http://www.opengis.net/def/waterml/2.0/interpolationType/Continuous",
            "type": ".ScalarStreamMetaData"
          }
        }
      ]
    },
    "sensor_y": {
      "time_zone": "UTC",
      "time_format": "yyyy-MM-dd HH:mm:ss",
      "sample_period": "PT1H",
      "reporting_period": "PT1H",
      "headers": [
        {
          "label": "rain_count",
          "stream_metadata": {
            "interpolationType": "http://www.opengis.net/def/waterml/2.0/interpolationType/TotalPrec",
            "type": ".ScalarStreamMetaData"
          }
        }
      ]
    }
  }
}

Command line tests (Bash):

export SENAPS=senaps.eratos.com && export YOUR_USERNAME=<TODO> && export YOUR_PASSWORD=<TODO> && export KODENAMIZE_ID=<TODO>
t=$(date -u +%Y-%m-%d\ %T);mosquitto_pub -h $SENAPS -p 1883 -u "dsapi-${KODENAMIZE_ID}:${YOUR_USERNAME}" -P "${YOUR_PASSWORD}" -t application/paddock_south/unit_1/sensor_x -m "${t},20,$((1 + $RANDOM % 10))"
t=$(date -u +%Y-%m-%d\ %T);mosquitto_pub -h $SENAPS -p 1883 -u "dsapi-${KODENAMIZE_ID}:${YOUR_USERNAME}" -P "${YOUR_PASSWORD}" -t application/paddock_south/unit_1/sensor_y -m "${t},66"

6. Initialisation checks

If the parser has successfully passed configuration validation and connected to a data source, you will see the log message:

CSV Parser has started and is ready for incoming messages