flume拦截器
  7BOUFwwpUIXU 2023年11月02日 41 0


RegexExtractorInterceptor作为一个Interceptor实现类可以根据一个正则表达式匹配event body来提取字符串,并使用serializers把字符串作为header的值

实例:
以如下的命令使用execsource收集日志的时候,可以根据文件的名称设置不同的header,进行不同的操作



1



2



3



4


#!/bin/sh



filename=$ 1



hostname=`hostname -s`



tail -F $ 1  | awk -v filename=$filename -v hostname=$hostname  '{print filename":"hostname":"$0}'




source的配置:



1



2



3



4



5



6



7



8


xxxx.sources.kafka1.interceptors = i1



xxxx.sources.kafka1.interceptors.i1.type = regex_extractor



xxxx.sources.kafka1.interceptors.i1.regex = /apps/logs/(.*?)/



xxxx.sources.kafka1.interceptors.i1.serializers = s1



xxxx.sources.kafka1.interceptors.i1.serializers.s1.name = logtypename



xxxx.sources.kafka1.selector.type = multiplexing



xxxx.sources.kafka1.selector.header = logtypename



xxxx.sources.kafka1.selector.mapping.nginx = nginx-channel




几个参数项:
regex 正则表达式



1



2



3



4



5



6


serializers  定义匹配组(正则匹配之后的值作为header的值,比如如果



Event body为 1 : 2 : 3 .4foobar5,regex为(\\d):(\\d):(\\d),serializers 



设置为a b c,serializers.a.name 为one,serializers.b.name为two,serializers.c.name



为three,那么one-> 1 ,two-> 2 ,three-> 3 .4foobar5,注意可以不必匹配所有的组)



 



serializers.x.name 作为event的header




首先看内部类Builder:
1)configureSerializers方法用来生成配置项,主要是操作List<NameAndSerializer>,静态内部类NameAndSerializer是一个包含了headerName和RegexExtractorInterceptorSerializer属性的容器,这里每一个serializers.x.name的配置对应一个RegexExtractorInterceptorSerializer对象,RegexExtractorInterceptorSerializer默认是org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer,即对参数不做任何处理直接返回:



1



2



3



4



5



6



7



8



9



10



11



12



13



14



15



16



17



18



19



20



21



22



23



24



25



26


private  List<NameAndSerializer> serializerList;



private  final  RegexExtractorInterceptorSerializer defaultSerializer =  new  RegexExtractorInterceptorPassThroughSerializer();



....



private  void  configureSerializers(Context context) {



String serializerListStr = context.getString( SERIALIZERS );  //解析serializers的配置



Preconditions. checkArgument(!StringUtils. isEmpty(serializerListStr),



"Must supply at least one name and serializer"  );



String[] serializerNames = serializerListStr.split(  "\\s+"  );  //按空格分隔



Context serializerContexts =



new  Context(context.getSubProperties( SERIALIZERS +  "." ));



serializerList = Lists. newArrayListWithCapacity(serializerNames.length);



for (String serializerName : serializerNames) {  //对每一个serializers里面的设置进行操作



Context serializerContext =  new  Context(



serializerContexts.getSubProperties(serializerName +  "."  ));



String type = serializerContext.getString(  "type"  "DEFAULT"  );  //获取serializers.x.type的设置,默认值是DEFAULT,即org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer



String name = serializerContext.getString(  "name"  );  获取serializers.x.name的设置



Preconditions. checkArgument(!StringUtils. isEmpty(name),



"Supplied name cannot be empty."  );



if  ( "DEFAULT"  .equals(type)) {



serializerList .add( new  NameAndSerializer(name, defaultSerializer));  //生成NameAndSerializer对象,并加入到List<NameAndSerializer>中,静态内部类NameAndSerializer是一个包含了headerName和RegexExtractorInterceptorSerializer属性的容器,这里每一个serializers.x.name的配置对应一个RegexExtractorInterceptorSerializer对象



else  {



serializerList .add( new  NameAndSerializer(name, getCustomSerializer(



type, serializerContext)));  //getCustomSerializer用于根据type的设置返回RegexExtractorInterceptorSerializer对象



}



}



}




这里org.apache.flume.interceptor.RegexExtractorInterceptorSerializer 接口类,定义了一个抽象方法serialize,实现类包括:



1



2



3



4


org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer 



//直接返回,不做另外的操作(默认的类)



org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer 



//使用指定的formatting pattern把传入的值转换为milliseconds




  
2)build方法用于返回一个RegexExtractorInterceptor对象



1


return  new  RegexExtractorInterceptor( regex , serializerList );




RegexExtractorInterceptor的主要方法intercept:



1



2



3



4



5



6



7



8



9



10



11



12



13



14



15



16



17



18



19



20



21



22


static  final  String REGEX =  "regex"  ;



static  final  String SERIALIZERS =  "serializers"  ;



...



public  Event intercept(Event event) {



Matcher matcher = regex.matcher(



new  String(event.getBody(), Charsets.UTF_8));  //对Event的body进行matcher操作



Map<String, String> headers = event.getHeaders();  // 获取Event的header键值对



if  (matcher.find()) {  //检测字符串中的子字符串是否可以匹配到正则



for  int  group =  0 , count = matcher.groupCount(); group < count; group++) {



int  groupIndex = group +  1 // 匹配的index从1开始



if  (groupIndex > serializers .size()) {  //判断index是否大于serializers列表(configure产生的List<NameAndSerializer>)的长度



....



break ;



}



NameAndSerializer serializer = serializers.get(group);  //从serializers中获取对应的NameAndSerializer 对象



....



headers.put(serializer. headerName,



serializer. serializer.serialize(matcher.group(groupIndex)));  // 向Event中插入headerName和对应的value,这里headerName即为serializers.x.name的设置,value会通过RegexExtractorInterceptorSerializer进行处理



}



}



return  event;



}




 

本文出自 “菜光光的博客” 博客,请务必保留此出处http://caiguangguang.blog.51cto.com/1652935/1619537

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
7BOUFwwpUIXU
最新推荐 更多

2024-05-31