Get Pipeline Offset and Lag

Get the Kafka Offset and Lag of a specific stream processing pipeline.

Prerequisites

A stream processing pipeline is created with the Stream Processing service, and the status of the pipeline is RUNNING.

Request Format

GET https://{apigw-address}/streaming/v2.0/streaming/pipeline/{pipelineId}/offset-lag

Request Parameters (URI)

Name

Location (Path/Query)

Mandatory/Optional

Data Type

Description

pipelineId

Path

Mandatory

String

The stream processing pipeline ID, which can be found on the EnOS Management Console > Stream Processing > Stream Operation page.

orgId

Query

Mandatory

String

The organization ID. How to get the orgId>>

Response Parameters

Name

Data Type

Description

data

List<JSONObject>

The Offset and Lag information of the stream processing pipeline. For details, see data

data

Name

Data Type

Description

pipelineId

String

Stream processing pipeline ID.

topicName

String

Kafka Topic name.

lagAndOffset

JSONObject

List of lag and offset information of the specified stream processing pipeline. For details, see Lag and Offset

Lag and Offset

Name

Data Type

Description

lag

String

Lag of the current time.

offset

String

Offset of the current time.

ts

Long

Timestamp.

Error Code

Code

Error Information

Description

61108

Stream processing job does not exist.

Stream processing pipeline does not exist. Please check the pipeline ID.

99000

Internal Server Error.

Internal service error.

Sample

Request Sample

url: https://{apigw-address}/streaming/v2.0/streaming/pipeline/{pipelineId}/offset-lag?orgId=yourOrgId

method: GET

Return Sample

{
  "code": 0,
  "msg": "OK",
  "data": {
    "topicName": "MEASURE_POINT_INTERNAL_o15517683199241",
    "lagAndOffset": [
      {
        "lag": 0,
        "offset": 6094977,
        "ts": 1616759448328
      },
      {
        "lag": 0,
        "offset": 6094977,
        "ts": 1616759673388
      }
    ],
    "pipelineId": "565c0114-9585-4372-86d1-50dde5ab8ce4"
  }
}

SDK Samples


You can access the SDK samples for stream processing service on GitHub: