Query Guide-Stream
  ha7vY1zXUFvq 2023年11月02日 54 0

事件流

事件流定义包含流名称和一组属性,这些属性具有特定类型和流范围内唯一可识别的名称。

目的

       接收事件的输入,接收查询处理结果的输出。

语法

define stream <stream

                             <attribute name> <attribute type>, ... );

以下参数用于配置流定义

参数

描述

<stream name>

流名称

<attribute name>

参数名称

<attribute type>

参数类型(STRING, INT, LONG, DOUBLE, FLOAT, BOOL or OBJECT)

例子

define stream TempStream (deviceID long, roomNo int, temp double);

来源Source

源通过多种传输和各种数据格式接收事件,并将它们引导到流中进行处理。

源配置允许定义映射,以便将每个传入事件从其本地数据格式转换为Siddhi事件。当没有提供对此类映射的自定义时,Siddhi假设到达的事件遵循基于流定义和配置的消息映射类型的预定义格式。

目的

提供了一种使用外部系统事件并将其转换为流处理的方式。

语法

Query Guide-Stream_JSON

@source annotation的type参数定义了接收事件的源类型。

@source annotation的其他参数取决于所选的源类型,其中一些参数可以是可选的。

以下是Siddhi支持的源类型列表:

源类型

描述

In-memory

允许SiddhiApp使用在同一JVM上运行的其他SiddhiApp中的事件。

HTTP

HTTP服务

Kafka

订阅Kafka主题以消费事件

TCP

TCP服务

Email

通过POP3IMAP协议使用电子邮件

JMS

订阅JMS主题或队列以使用事件

File

读取文件中的事件

CDC

数据库CDC日志数据

Prometheus

监控系统数据

In-memory是Siddhi中唯一内置的源,所有其他源类型都作为扩展实现

源映射

每个@source配置都可以有一个由@map注释表示的映射,该注释定义了如何将传入事件格式转换为Siddhi事件。

@map的type参数定义了在转换传入事件时要使用的映射类型。@map annotation的其他参数取决于所选的映射器,其中一些参数可以是可选的。

映射Attributes

@attributes是与@map一起使用的可选注释,用于定义自定义映射

支持的源映射类型

源映射类型

描述

PassThrough

省略Siddhi事件的数据转换

JSON

JSON消息转换为Siddhi事件

XML

XML消息转换为Siddhi事件

TEXT

TEXT中消息转换为Siddhi事件

Avro

Avro事件转换为Siddhi事件

Binary

Siddhi特定的二进制事件转换为Siddhi事件

Key Value

将键值HashMaps转换为Siddhi事件

CSV

将类似CSV分隔符的事件转换为Siddhi事件

提示:

当没有提供@map注释时,使用@map(类型=“passThrough”)作为默认值,它将消耗的Siddhi事件直接传递到流,而不进行任何数据转换。

PassThrough是Siddhi中唯一内置的源映射器,所有其他源映射器都是作为扩展实现的。

例子1

通过公开HTTP服务接收JSON消息,并将它们引导到InputStream流中进行处理。在这里,HTTP服务将通过基本身份验证进行保护,在端口8080和context/foo上的所有网络接口上接收事件。该服务要求JSON消息采用JSON映射器支持的默认数据格式,如下所示。

Query Guide-Stream_自定义_02

HTTP源和JSON源映射器的配置实现了上述功能,如下所示。

Query Guide-Stream_HTTP_03

例子2

通过公开HTTP服务接收JSON消息,并将它们引导到StockStream流中进行处理。在这里,传入的JSON,如下所示,不遵守JSON映射器支持的默认数据格式。

Query Guide-Stream_自定义_04

配置HTTP源和自定义JSON源映射以实现上述功能如下。

Query Guide-Stream_HTTP_05

接收器Sink

目的

Sink通过将事件转换为支持的格式,提供了一种将流的Siddhi事件发布到外部系统的方法。

语法

Query Guide-Stream_自定义_06

以下是Siddhi支持的接收器类型列表

接收器类型

描述

In-memory

允许SiddhiApp使用在同一JVM上运行的其他SiddhiApp中的事件。

HTTP

将事件发布到HTTP服务。

Kafka

将事件发送到Kafka主题

TCP

T将事件发布到TCP服务

Email

通过SMTP协议发送电子邮件

JMS

将事件发布到JMS主题或队列

File

写事件到文件

Log

记录流中出现的事件

Prometheus

将数据发布到监控系统

分布式接收器

分布式接收器使用负载平衡或分区策略将事件从定义的流发布到多个端点。

任何接收器都可以用作分布式接收器。分布式接收器配置允许用户定义一个通用映射,以转换和发送所有目标端点的Siddhi事件。

目的

分布式接收器提供了一种以配置的事件格式将Siddhi事件发布到多个端点的方法。

语法

轮询分布式接收器

以循环方式将事件发布到定义的目的地

Query Guide-Stream_JSON_07

分区分布式接收器

通过基于分区键对事件进行分区,将事件发布到定义的目标。

Query Guide-Stream_HTTP_08

接收器映射Sink Mapper

使用@map做映射

@payload是与@map一起使用的可选注释,用于定义自定义映射

有两种方法可以配置@payload注释

1一些映射器(如XML、JSON和Test)只接受一个输出负载

@payload( 'This is a test message from {{user}}.')

2某些映射器(例如键值)接受一系列映射值

@payload( key1='mapping_1', 'key2'='user : {{user}}')

以下是Siddhi支持的接收器映射类型列表

类型

描述

PassThrough

省略传出Siddhi事件的数据转换

JSON

Siddhi事件转换为JSON消息

XML

Siddhi事件转换为XML消息

TEXT

Siddhi事件转换为纯文本消息

Avro

Siddhi事件转换为Avro事件

Binary

Siddhi事件转换为Siddhi特定的二进制事件

Key Value

Siddhi事件转换为键值HashMaps

CSV

Siddhi事件转换为类似CSV的分隔符分隔事件

例子1

Sink通过将OutputStream事件转换为默认格式的JSON消息并发送到HTTP端点来发布这些事件http://localhost:8005/endpoint1,使用POST方法,Accept头,并且具有admin的基本身份验证是用户名和密码。

HTTP接收器和JSON接收器映射器的配置实现了上述功能,如下所示。

Query Guide-Stream_自定义_09

这将以以下格式发布JSON消息

Query Guide-Stream_HTTP_10

例子2

Sink通过将StockStream事件转换为用户定义的JSON消息并将其发送到HTTP端点来发布StockStream事件http://localhost:8005/stocks.

配置HTTP接收器和自定义JSON接收器映射以实现上述功能如下。

Query Guide-Stream_HTTP_11

这将以以下格式将单个事件发布为JSON消息

Query Guide-Stream_HTTP_12

这也可以将多个事件一起发布为JSON消息,格式如下

Query Guide-Stream_自定义_13

例子3

Sink使用分区策略将OutputStream流中的事件发布到多个HTTP端点。在这里,事件被发送到http://localhost:8005/endpoint1http://localhost:8006/endpoint2基于划分关键国家。当发布到两个端点时,它使用默认的JSON映射、POST方法和admin作为用户名和密码。

Query Guide-Stream_自定义_14

这将对传出事件进行分区,并将具有相同国家/地区属性值的所有事件发布到同一个端点。发布的JSON消息将采用以下格式:

Query Guide-Stream_JSON_15

错误处理Error Handling

Siddhi中的错误可以在Streams和Sink中处理

流中的错误处理

当订阅流的Siddhi元素抛出错误时,错误会传播到将事件传递给这些Siddhi元件的流。默认情况下,会在流中记录并删除错误,但可以通过在相应的流定义中添加@OnError注释来更改此行为@OnError注释可以帮助用户捕获错误和相关事件,并通过将它们发送到故障流来优雅地处理它们。

@OnError注释和需要指定的操作如下。

Query Guide-Stream_JSON_16

@OnError注释的action参数定义了在失败场景中要执行的操作。可以为@OnError注释指定以下操作来处理错误场景。

1日志:记录带有错误的事件,并删除该事件。即使未定义@OnError注释,这也是执行的默认操作。

2 STREAM:创建一个故障流,并将事件和错误重定向到它。创建的故障流将具有在基流中定义的所有属性,以捕获导致错误的事件,此外,它还包含包含错误信息的object类型的_error属性。可以通过添加来引用故障流!在基流前面的名称为<流名称>。

例子

通过将错误重定向到故障流来处理TempStream中的错误

TempStream流和@OnError注释的配置如下

Query Guide-Stream_JSON_17

Siddhi将推断并自动定义TempStream的故障流,如下所示。

Query Guide-Stream_HTTP_18

SiddhiApp通过使用查询添加故障生成和错误处理来扩展上述用例,如下所示。

注:通过查询编写处理逻辑的详细信息将在后面的章节中解释

Query Guide-Stream_自定义_19

接收器错误处理

在某些情况下,当事件发布到外部系统时,外部系统可能会变得不可用或出现错误。默认情况下,sink会记录并删除导致事件丢失的事件,这可以通过配置@sink注释的.error参数来正常处理。

@sink注释的on.error参数可以指定如下。

Query Guide-Stream_HTTP_20

可以为@sink注释的on.error参数指定以下操作以处理错误场景。

1日志:记录带有错误的事件,并删除该事件。这是即使在@sink注释上未定义on.error参数时执行的默认操作

2 WAIT:发布线程在后退和重试模式下等待,并且仅在重新建立连接时发送事件。在此期间,线程将不会消耗任何新消息,从而导致系统在发布到它的系统上引入背压。

3 STREAM:将具有相应错误的失败事件推送到接收器所属的相关故障流。

例子1

当系统无法连接到Kafka时,等待重试

TempStream流和带有on.error属性的@ssink Kafka注释的配置如下。

Query Guide-Stream_HTTP_21

例子2

当系统无法连接到Kafka时,将事件发送到TempStream的故障流

Query Guide-Stream_HTTP_22



【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
  dpoUgXS1q0aA   2023年12月12日   25   0   0 JSONJSON数据数据
ha7vY1zXUFvq
最新推荐 更多

2024-05-31