When using the Data Lake sink, the incoming events are stored in an InfluxDB.

Implementation

org.apache.streampipes.sinks.internal.jvm.datalake

The concrete implementation comprises a Data Lake class, a Data Lake Controller class, a Data Lake InfluxDB Client class and a Data Lake Parameters class. The code is basically the same as for the InfluxDB sink (org.apache.streampipes.sinks.databases.jvm.influxdb).

Data Lake Parameters Class

The parameter class defines the necessary parameters for the configuration of the sink.

parameterdescription

influxDbHost

hostname/URL of the InfluxDB instance. (including http(s)://)
influxDbPortport of the InfluxDB instance
databaseNamename of the database where events will be stored
measureNamename of the Measurement where events will be stored (will be created if it does not exist)
userusername for the InfluxDB server
passwordpassword for the InfluxDB server
timestampFieldfield which contains the required timestamp (field type = http://schema.org/DateTime)
batchSizeindicates how many events are written into a buffer, before they are written to the database
flushDurationmaximum waiting time for the buffer to fill the Buffer size before it will be written to the database in ms
dimensionPropertieslist containing the tag fields (scope = dimension property)

Data Lake Controller Class

In controller class, the model is declared for viewing and configuration in Pipeline Editor, and initializes sink on invocation of pipeline.

The measurement name and the timestamp field are derived from user input, the remaining parameters (except batch size and flush duration) from org.apache.streampipes.sinks.internal.jvm.config.SinksInternalJvmConfig. Batch size is fixed to 2000 events and flush duration is set to 500 ms.

Data Lake Class

The data lake class itself essentially controls the saving of events to the database. For this purpose, it uses the Data Lake InfluxDB Client.

method namedescription
onInvocation

starting the DataLakeInfluxDbClient, registering and initializing new measurement series in InfluxDB

onEventadding empty label field to incoming event and storing event in database
onDetachstopping the DataLakeInfluxDbClient

Image data, unlike events, is not stored directly in database but as Image files in a corresponding directory (writeToImageFile).
In addition, the class contains two utility methods (registerAtDataLake and prepareString)

Data Lake InfluxDB Client Class

Client class that connects to InfluxDB and writes events directly to database. Uses the Data Lake Parameters described above.

method namedescription
validate

checks whether the influxDbHost is valid

connectconnects to the InfluxDB server, sets the database and initializes the batch-behaviour
databaseExistschecks whether the given database exists
createDatabasecreates a new database with the given name
savesaves an event to the connnected InfluxDB database
stopshuts down the connection to the InfluxDB server

TODO:

  • validate(): use validation method (org.apache.commons.validator.routines.InetAddressValidator) instead of regex check


REST API

DataLakeNoUserResourceV3

org.apache.streampipes.rest.impl.datalake

This class contains the basic interface definition for setting up a new measurement series in Data Lake and calls the underlying methods of org.apache.streampipes.dataexplorer.DataLakeNoUserManagementV3. Usage of the related API calls does not require any authentification with valid username and password.

method namerequest typepathdescription
addDataLake

POST

/{measure}adds new measurement series with specified measure name and related event properties (column names) in InfluxDB


TODO:

  • [STREAMPIPES-348]: fix issue with special characters in user-defined measure name
  • add authentication obligation to addDataLake method

DataLakeResourceV3

org.apache.streampipes.ps

This class contains the extended interface definition and calls the underlying methods of org.apache.streampipes.dataexplorer.DataLakeManagementV3 and org.apache.streampipes.dataexplorer.utils.DataExplorerUtils when invoked. Usage of below mentioned API calls requires authentification with valid username and password.

method namerequest typepathdescription

getPage

GET

/data/{index}/paging

returns pages with predefined number of events per page of a specific measurement series from InfluxDB

getAllInfos

GET

/info

returns list with ids of all existing measurement series (including event schema) from InfluxDB

getAllData

GET

/data/{index}

/data/{index}/last/{value}/{unit}

/data/{index}/{startdate}/{enddate}

returns all stored events of a specific mesurement series from InfluxDB

returns an aggregated set of all stored events of a specific mesurement series from InfluxDB

returns all stored events within the specified time frame of a specific mesurement series from InfluxDB

getAllDataGroupingGET/data/{index}/{startdate}/{enddate}/grouping/{groupingTag}returns all events within a specified time frame of a specific mesurement series grouped by a specific tag from InfluxDB

removeAllData

DELETE

/data/delete/all

removes all stored events from InfluxDB
downloadDataGET

/data/{index}/download

/data/{index}/{startdate}/{enddate}/download


downloads all events of a specific mesurement series from InfluxDB in desired format

downloads all events within a specified time frame of a specific mesurement series from InfluxDB in desired format

getImage

GET

/data/image/{route}/file

returns png image from file route

saveImageCoco

POST

/data/image/{route}/coco

stores image as file at file route

getImageCoco

GET

/data/image/{route}/coco

returns image at file route as application/json
labelDataPOST

/data/{index}/{startdate}/{enddate}/labeling/{column}/{timestampColumn}?label=

updates label in specified column for all events within specified time frame to provided label value


TODO:

  • fix export of data from data lake, which currently returns two timestamp fields
  • extend aggregation functionality to support non-numeric values (e.g. strings → majority vote) add the possibility to specify an aggregation function
  • in general: alignment of the single endpoint definitions and consideration of the extensions below


Ideas for possible adaptations and extensions of the REST API

In addition to the TODOs listed above in the text, the following adjustments and enhancements might be worth considering. Thereby, it is important that the implementation of the endpoints is as independent as possible from the technology of the data lake (e.g. avoiding InfluxDB-specific methods).

  • Extension of the remove endpoint by the capability to
    • selectively delete an individual measurement series
    • delete measurements of a measurement series within a specific time interval or before a specific date
  • Adding an edit endpoint for adjusting data lake specific properties such as retention time.

Both extensions could be included in a kind of data management tool in the UI within an admin view (e.g. in the manner of the pipeline view).

Another possible adaptation would be the comprehensive implementation of an append-only approach for time series data. In particular, the functionality of the labelData method would have to be adapted here, which currently works with updates of existing DB entries.




References:

  • No labels

9 Comments

  1. Hi,

    here are some comments from my side.

    Data Lake Controller Class

    • The configurations batch size and flush duration should at least be configurable via the container configs


    DataLakeResourceV3 

    • the endpoints have no authentication because they are called from the DataLakeSink and of course the user/password is not known there. In order to introduce authentication, a concept with technical users would probably have to be introduced.

    Data Lake Controller Class

    • Here we use a lot of path parameters, which are query parameters. E.g.

      •  /data/{index}/{startdate}/{enddate}/grouping/{groupingTag} → /data/{index}?stardate=...&enddate=..&groupby=...

      • /data/delete/all → /data?index=... (No index ^= delete all)


    General - Saving and Reading Images and Coco Files

    • It would also be good to think about the file sharing part. The way we do it now, it only works because we use volume sharing. Maybe we should use blob storage for this kind of data? 


    1. Hi Johannes,

      thanks for your thoughts on possible adaptions / extentions.
      Especially the configurability of flush duration and batch size is a good idea that we defintely should include.

      Also your point with the introduction of a technical user is a good point. Maybe in this context it also makes sense to think about extending the token-based authentication (service API key), which is already used for "external" services. This mechanism could be used to implement token-based authentication at the Data Lake sink instead of using a technical user. The token would then have to be set as an environment variable, for example. What do you think about that solution? Or would it be better to use an explicit technical user?

  2. Hi Daniel,

    thanks for coming up with this!


    Regarding authentication, I'm currently working on improving the communication between pipeline elements and core services. The idea is to provide an instance of the StreamPipes client within the onInvocation method so that pipeline elements requesting anything from the core APIs can directly interact with these using the aforementioned service token. In this case, the data lake sink would just call the platform service API in order to create a new measure or do anything else in the core (e.g., sending emails or creating live visualizations). For that, we need to extend our Auth concept with a service user. I'm also trying to get rid of the Consul config provider and integrate this directly into the description of a pipeline element.

    After that, we can simply move the data lake API to the platform services module and make the endpoint authenticated.

    In the meanwhile, we can work on improving the REST interface itself, e.g., the comment from Johannes concerning the usage of query parameters instead of path parameters seems very important to me. So discussing how to improve the DataLakeRestResourceV3 (let's say the way towards v4) should be the next step IMO.

    From a user perspective, it should be easily understandable how to get data, filter and group data, upload/download/list images and so on. Do you have any first suggestion on how to redesign this endpoint? Otherwise, I can provide some first input.

    Dominik


  3. Hi,
    I have created a first draft for the revised endpoint definitions (see table below). Thereby, I have tried to map as suggested as much as possible via query parameters. However, the user should have for example the possibility to specify multiple grouping tags for the "groupBy" parameter.

    The file sharing part for the image / coco files I kept very prototypical and assumed a simple object store (key-value store). Here we can also discuss the solution mentioned by Johannes using a blob storage and adapt the draft accordingly.

    I am interested in your comments and suggestions for adjustments.



    Suggestion for the revised endpoint definitions

    functionalityrequest typepathquery parameters
    listing all existing measurement series (incl. event schema)GET/list

    -

    getting measurements from specified measurement seriesGET/data/{index}
    • startDate: optional
      start date, if not specified: first element
    • endDate: optional
      end date, if not specified: last element
    • groupBy: optional
      grouping tags
    • aggregationFunction: optional
      aggregation function (e.g. mean)
    • timeInterval: optional
      time interval for aggregation (e.g. 1m - one minute), if specified: performing group by time

    getting specific page with predefined number of events per page of a specific measurement series

    GET/data/{index}/paging
    • page:
      page number
    • itemsPerPage:
      data points per page

    downloading events of a specific mesurement series from in desired format

    GET/data/{index}/download
    • format: default "csv"
      download format (csv, json)
    • startDate: optional
      start date, if not specified: first element
    • endDate: optional
      end date, if not specified: last element
    removing measurements from a specific measurement seriesDELETE/data/{index}/delete
    • startDate: optional
      start date, if not specified: first element
    • endDate: optional
      end date, if not specified: last element
    removing any or a specific measurement seriesDELETE/clear
    • index: optional
      index of measurement series
    labeling of measurementsPOST/data/{index}/labeling
    • startDate:
      start date, if not specified: first element
    • endDate:
      end date, if not specified: last element
    • column:
      label column
    • label:
      class label
    specifying configuration parameters of data lakePOST/edit
    • parameter_name = value (e.g. retention_time)
    getting a list with all available files (e.g. key + metadata)GET/files/list
    • type: optional
      file type (image, COCO)
    getting / saving a specific file (addressable via key)GET / POST/files
    • key
    removing a specific file (addressable via key)DELETE/files/delete
    • key
    1. Hi Daniel,

      that looks already great (smile)


      I would suggest changing the API paths a little to make it a little more RESTful:

      My comments

      request type

      path

      query parameters

      Payload

      GET / DELETE/measurement 

      -



      GET / DELTE/measurement/{index}

      How many measurements it will return? all? That can be a lot ;)
      What you think about a "limit" query param?
      GET/measurement/{index}/data
      • startDate: optional
      • endDate: optional
      • groupBy: optional
      • aggregationFunction: 
      • timeInterval: optional


      GET/measurement/{index}/data/page
      • page:
      • itemsPerPage:

      What is the difference to /measurement/{index}/data ?

      Maybe it is possible to use the same API, just add the format query param?


      GET / DELETE/measurement/{index}/data/download
      • format: 
      • startDate: optional
      • endDate: optional


      POST / GET / DELETE/measurement/{index}/label
      • startDate:
      • endDate:
      • column:
      • label:

      GET/measurement/{index}/label


      PUT (Updating)/configuration
      • Pair-Values e.g. { retention_time: 10, ...} 

      GET/configuration


      GET / DELETE / POST/file
      • type: optional


      GET/DELETE/files/{key}

  4. Looks good!

    A few comments/questions from my side:

    • Can we integrate the paged endpoint into the standard endpoint? E.g., by having offset and limit parameters?
    • Multiple grouping tags in the groupingTag query parameter can be comma-separated
    • Can we merge the download feature into the standard endpoint? E.g., by providing a query parameter action=download and another parameter format=csv 
    • Do we already have or foresee an endpoint to upload an image file? That would be something I'd currently love to have (wink)

    I'm not sure whether the multiple query endpoints for paged, grouped and non-grouped queries return the same schema - that would be something we would need to harmonize in case there is a single endpoint.

    Regarding the comment from Johannes, what is the difference between measurement and index?

    And regarding naming of resources as raised by Johannes, a common best practice is to have an endpoint such as /measurements (using plural) where a GET would return a list of all measurements and /measurements/{measurementId} (maybe this would be index) that returns the content for the specific source. In that case, to delete a specific measurement, a DELETE call to /measurements/{measurementId} could be available. Hope this helps!


    And another, more high-level comment regarding the roadmap: In the near future, I'd really like to simplify some StreamPipes concepts. One idea here is that everything in StreamPipes is either a Data Set or a Data Stream, which can be configured to be persisted (without manually creating a pipeline). For pipelines that transform data streams that should be stored in the data lake, there would be two "Virtual Data Stream/Set" sinks instead of the data lake/dashboard sink, which itself represent a stream/set in the UI that can be configured to be persisted. In this case, any resource in the data lake could be discovered by the (e.g., appId or some other identifier) of the underlying data stream/set. Maybe we can already foresee that in the naming of resources for this API redesign.


  5. Thanks for your helpful input. I have incorporated your comments into the draft (see table below).

    As suggested, I have left the file sharing /storage part out for now. However, this is also an exciting and important topic that we should address.

    @Dominik: As far as I know, the image upload is currently done via the image connect adapter (set: org.apache.streampipes.connect.adapters.image.set, stream: org.apache.streampipes.connect.adapters.image.stream). I have not worked with it yet. But it would certainly be nice to have an endpoint for it.

    Regarding the high-level roadmap you mentioned, we can foresee the use of an UUID (or similar) as measurementID. However, imo it would then make sense to write additional metadata about the measurement series to the data lake when creating a persistent adapter (e.g. user-assigned plaintext name), for user convenience purposes. But I think, with InfluxDB only measurement-level metadata is supported in the form of tags, which would generate too much overhead if this additional information had to be written each time. But please correct me here if I am wrong. But does the resource naming fit then or do you have a concrete suggestion for the renaming?


    functionalityrequest typepathquery parameterspayload
    listing all existing measurement series (incl. event schema)GET/measurements

    -


    removing all existing measurement seriesDELETE/measurements

    -


    getting data points of a specific measurement seriesGET/measurements/{measurementID}

    slicing data by timestamp criterion

    • startDate
      start date, if not specified: first element
    • endDate
      end date, if not specified: last element

    paging

    • page:
      page number
    • itemsPerPage / limit:
      limit rows per page
    • offset:
      specify offset (time_offset)

    grouping

    • groupBy:
      grouping tags (comma-separated)
    • aggregationFunction:
      aggregation function (e.g. mean)
    • timeInterval:
      time interval for aggregation (e.g. 1m - one minute), if specified: performing group by time

    downloading
    can be combined with slicing operators

    • format
      data format (csv, json)

    removing a specific measurement series or selected measurements from that seriesDELETE/measurements/{measurementID}
    • startDate: optional
      start date, if not specified: first element
    • endDate: optional
      end date, if not specified: last element

    labeling of measurements

    POST


    /measurements/{measurementID}/labeling
    • startDate:
      start date, if not specified: first element
    • endDate:
      end date, if not specified: last element
    • column:
      label column
    • label:
      class label
    specifying configuration parameters for a specific measurement seriesPOST/measurements/{measurementID}/configuration-
    • key/value-pairs:{parameter_name: value}
    getting current configuration parameters for a specific measurement seriesGET/measurements/{measurementID}/configuration-
  6. Hi Daniel,

    thanks, looks very good to me!

    I'd say we can keep the user-defined index name as the measurementId by now and switch it to the unique ID of the corresponding data stream once we've integrated data streams and their persistence. In that case, the metadata would directly come from the data stream description and users would only interact with the name of the data stream in the data explorer and dashboard without the need to explicitly assign/know the index name.

    So from my point of view, this API redesign is ready for entering the implementation phase (wink)