Working With a Stream
Setup
Before connecting to the Stream endpoint, a customer will need to first create a stream.
Disconnects
Validic will close a streaming connection for the following reasons:
A streaming server is restarted. This is usually related to a code deploy and is not very frequent.
Validic network configuration changes. These events are rare, and would represent load balancer restarts or network reconfigurations, for example. Validic will work diligently to notify customers if we feel any changes made by us will impact users.
The client is responsible for reestablishing the connection if a disconnect occurs. Validic will maintain a customer's checkpoint in a stream for 7 days. If a customer re-connects to their stream after 7 days the customer will start consuming data events from the beginning of the stream.
Connection event handling
Connection events do not replay in the Streaming API. If you lose connectivity to a stream that publishes connection events, you will not receive events for any connection changes that occur while you are disconnected. You will need to check the custom marketplace or the user's profile to determine the current connection status for each user. After reconnecting to the stream, any changes that occur going forward will be reported in the stream.
Processing
Once connected to a stream, all data received by validic will be transmitted in a data event. Additionally, a poke event will be sent every 5 seconds to maintain a heartbeat with the client.
Messages streamed by this API are JSON encoded. Keep in mind that the attributes of a JSON-encoded object are unordered - do not rely on fields appearing in any given order. Also keep in mind that your JSON parser should tolerate unexpected or missing fields.
Events streamed through the connection are delimited by consecutive \n characters:
event: data
data: {"category":"daily","checksum":"3f5278b39427bb0dede1f13f4ba3d22c","created_at":"2017-03-28T00:08:28.718Z","end_time":"2017-03-28T03:59:59Z","id":"a7810c17f561c564547d0ab0498c6a97","log_id":"2017-03-27","metrics":[{"origin":"manual","type":"active_duration","unit":"min","value":0.0},{"origin":"manual","type":"basal_energy_burned","unit":"kcal","value":1467.0},{"origin":"manual","type":"distance","unit":"km","value":4.1},{"origin":"manual","type":"elevation","unit":"m","value":0.0},{"origin":"manual","type":"floors_climbed","unit":"count","value":0},{"origin":"manual","type":"steps","unit":"count","value":5713},{"origin":"manual","type":"energy_burned","unit":"kcal","value":2072.0}],"offset_origin":"profile","origin":"manual","segments":[],"source":{"type":"fitbit"},"start_time":"2017-03-27T04:00:00Z","type":"summary","user":{"organization_id":"58209c808a5da50001de6e2d","uid":"TY892FG61","user_id":"583efed88a5da50001a45083"},"utc_offset":-14400,"version":"1.0"}\n
Connection Events
Each connection event sent from the streaming API will contain a JSON encoded Valdic standard object which will include a type attribute defining whether the event represents a user connection (create) or disconnection (delete) from a data source via the Validic Marketplace. Connection events are reported by the Validic marketplace for cloud-based (API) sources only. These messages will be sent in near-realtime to when the user completes a connection or disconnection via the marketplace. Data sources that use the Validic Mobile SDK do not report connection events. It is worth noting that theid
that is included with each connection of disconnection event is unique for every event. No two events will have the same id
value.
Connection event from Fitbit
{
"connection": {
"id": "631b71415e78120001328461",
"source": {
"type": "fitbit"
},
"profile": {
"gender": "male",
"metrics": [
{
"type": "body_height",
"unit": "m",
"value": 1.8288,
"origin": "manual"
},
{
"type": "body_weight",
"unit": "kg",
"value": 82.0,
"origin": "unknown"
}
]
},
"type": "create",
"credentials": {
"scope": [
"weight",
"respiratory_rate",
"profile",
"sleep",
"settings",
"nutrition",
"oxygen_saturation",
"heartrate",
"social",
"activity"
]
},
"user": {
"user_id": "58d96aSAMPLE001a25d40",
"uid": "demotestuser1",
"organization_id": "58cb55c98SAMPLE01f75db3"
},
"created_at": "2022-02-25T14:49:15Z",
"deleted_at": null,
"version": "1.0"
}
}
Connection event from other cloud-based sources
{
"connection": {
"id": "631b71415e78120001328462",
"source": {
"type": "garmin"
},
"type": "create",
"credentials": {},
"user": {
"user_id": "58d96aSAMPLE001a25d40",
"uid": "demotestuser1",
"organization_id": "58cb55c98SAMPLE01f75db3"
},
"created_at": "2022-02-29T15:15:15Z",
"deleted_at": null,
"version": "1.0"
}
}
Disconnection event from any cloud-based source
{
"connection": {
"id": "631b71415e78120001328461",
"source": {
"type": "fitbit"
},
"type": "delete",
"user": {
"user_id": "58d96aSAMPLE001a25d40",
"uid": "demotestuser1",
"organization_id": "58cb55c98SAMPLE01f75db3"
},
"created_at": "2022-02-25T14:49:15Z",
"deleted_at": "2022-09-28T12:42:48Z",
"deleted_reason": "User manually disconnected",
"version": "1.0"
}
}
Data Events
Each data event sent from the streaming API will contain a JSON encoded Validic standard object which will include a type attribute defining the standard object type. The data object could be one of the following types:
Types
- Summary
- Measurement
- Sleep
- Nutrition
- Workout
These messages will be sent in near-realtime to when the data is received by Validic, in the order, they were received from the source.
Unique Identifiers & Updates
Each data object contains a unique id which defines a unique event type received. When processing data events, you will receive the same id multiple times, likely with summary data types. Because the data is transmitted through to the client in which it was received, the last record received is the most up to date record for the event.
When receiving an updated event, a customer that is persisting this data should leverage the id to find the existing record in their system. Validic provides a checksum in each event. A difference in checksum values represents modified data for records with the same id. A final comparison on the created_at value can occur if there is concern about out of order processing as the most recent created_at time represents the most up to date record.
Example
data: {
"category":"daily",
"checksum":"c9a1eb48cbf2504e6a7a1055238ae153",
"created_at":"2017-03-29T15:31:33.959Z",
"end_time":"2017-03-30T03:59:59Z",
"id":"97a2d220620bfa419f8bb0228c5fac73",
"log_id":"2017-03-29",
"metrics":[
{
"origin":"manual",
"type":"active_duration",
"unit":"min",
"value":0.0
},
{
"origin":"manual",
"type":"basal_energy_burned",
"unit":"kcal",
"value":897.0
},
{
"origin":"manual",
"type":"distance",
"unit":"km",
"value":1.76
},
{
"origin":"manual",
"type":"steps",
"unit":"count",
"value":2415
},
{
"origin":"manual",
"type":"energy_burned",
"unit":"kcal",
"value":1184.0
}
],
"offset_origin":"profile",
"segments":[
],
"source":{
"type":"fitbit"
},
"start_time":"2017-03-29T04:00:00Z",
"type":"summary",
"user":{
"organization_id":"58cb55c98SAMPLE01f75db3",
"uid":"demotestuser1",
"user_id":"58d96aSAMPLE001a25d40"
},
"utc_offset":-14400,
"user_notes": [],
"version":"1.0"
}
Poke: This event is sent every 5 seconds to maintain a heartbeat between the streaming API and the client.
event: poke
data: {"ts":"2017-01-01T12:00:00.000Z"}
Related Articles
SSE Libraries
There are many libraries already built to process SSE streams. Validic doesn't maintain an SSE listener. It is up to the client to research and implement the SSE listener that works best within their environment. Any standard SSE listener should work with the Streaming API. Below are some examples
- Ruby: https://github.com/af83/em-eventsource
- Java: https://jersey.github.io/documentation/latest/sse.html
- Go: https://github.com/r3labs/sse
- .NET: [https://github.com/3ventic/EvtSource], [https://www.nuget.org/packages/3v.EvtSource/]
Updated 6 months ago