flume学习(六):使用hive来分析flume收集的日志数据
  J5Tcpbf1dOCP 2023年11月02日 57 0


flume学习(六):使用hive来分析flume收集的日志数据


前面已经讲过如何将log4j的日志输出到指定的hdfs目录,我们前面的指定目录为/flume/events。


如果想用hive来分析采集来的日志,我们可以将/flume/events下面的日志数据都load到hive中的表当中去。


如果了解hive的load data原理的话,还有一种更简便的方式,可以省去load data这一步,就是直接将sink1.hdfs.path指定为hive表的目录。

下面我将详细描述具体的操作步骤。

我们还是从需求驱动来讲解,前面我们采集的数据,都是接口的访问日志数据,数据格式是JSON格式如下:

{"requestTime":1405651379758,"requestParams":{"timestamp":1405651377211,"phone":"02038824941","cardName":"测试商家名称","provinceCode":"440000","cityCode":"440106"},"requestUrl":"/reporter-api/reporter/reporter12/init.do"}

现在有一个需求,我们要统计接口的总调用量。

我第一想法就是,hive中建一张表:test             然后将hdfs.path指定为tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/user/hive/warehouse/besttone.db/test

然后select  count(*) from test;   完事。

这个方案简单,粗暴,先这么干着。于是会遇到一个问题,我的日志数据时JSON格式的,需要hive来序列化和反序列化JSON格式的数据到test表的具体字段当中去。

这有点糟糕,因为hive本身没有提供JSON的SERDE,但是有提供函数来解析JSON字符串,

第一个是(UDF):

 get_json_object(string json_string,string path) 从给定路径上的JSON字符串中抽取出JSON对象,并返回这个对象的JSON字符串形式,如果输入的JSON字符串是非法的,则返回NULL。

第二个是表生成函数(UDTF):json_tuple(string jsonstr,p1,p2,...,pn) 本函数可以接受多个标签名称,对输入的JSON字符串进行处理,这个和get_json_object这个UDF类似,不过更高效,其通过一次调用就可以获得多个键值,例:select b.* from test_json a lateral view json_tuple(a.id,'id','name') b as f1,f2;通过lateral view行转列。


最理想的方式就是能有一种JSON SERDE,只要我们LOAD完数据,就直接可以select * from test,而不是select get_json_object这种方式来获取,N个字段就要解析N次,效率太低了。

好在cloudrea wiki里提供了一个json serde类(这个类没有在发行的hive的jar包中),于是我把它搬来了,如下:



[java]  
    view plain 
    copy 
    
 
    
 
  
1. package com.besttone.hive.serde;  
2.   
3. import java.util.ArrayList;  
4. import java.util.Arrays;  
5. import java.util.HashMap;  
6. import java.util.List;  
7. import java.util.Map;  
8. import java.util.Properties;  
9.   
10. import org.apache.hadoop.conf.Configuration;  
11. import org.apache.hadoop.hive.serde.serdeConstants;  
12. import org.apache.hadoop.hive.serde2.SerDe;  
13. import org.apache.hadoop.hive.serde2.SerDeException;  
14. import org.apache.hadoop.hive.serde2.SerDeStats;  
15. import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;  
16. import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;  
17. import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;  
18. import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;  
19. import org.apache.hadoop.hive.serde2.objectinspector.StructField;  
20. import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;  
21. import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;  
22. import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;  
23. import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;  
24. import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;  
25. import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;  
26. import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;  
27. import org.apache.hadoop.io.Text;  
28. import org.apache.hadoop.io.Writable;  
29. import org.codehaus.jackson.map.ObjectMapper;  
30.   
31. /**
32.  * This SerDe can be used for processing JSON data in Hive. It supports
33.  * arbitrary JSON data, and can handle all Hive types except for UNION. However,
34.  * the JSON data is expected to be a series of discrete records, rather than a
35.  * JSON array of objects.
36.  * 
37.  * The Hive table is expected to contain columns with names corresponding to
38.  * fields in the JSON data, but it is not necessary for every JSON field to have
39.  * a corresponding Hive column. Those JSON fields will be ignored during
40.  * queries.
41.  * 
42.  * Example:
43.  * 
44.  * { "a": 1, "b": [ "str1", "str2" ], "c": { "field1": "val1" } }
45.  * 
46.  * Could correspond to a table:
47.  * 
48.  * CREATE TABLE foo (a INT, b ARRAY<STRING>, c STRUCT<field1:STRING>);
49.  * 
50.  * JSON objects can also interpreted as a Hive MAP type, so long as the keys and
51.  * values in the JSON object are all of the appropriate types. For example, in
52.  * the JSON above, another valid table declaraction would be:
53.  * 
54.  * CREATE TABLE foo (a INT, b ARRAY<STRING>, c MAP<STRING,STRING>);
55.  * 
56.  * Only STRING keys are supported for Hive MAPs.
57.  */  
58. public class JSONSerDe implements SerDe {  
59.   
60. private StructTypeInfo rowTypeInfo;  
61. private ObjectInspector rowOI;  
62. private List<String> colNames;  
63. private List<Object> row = new ArrayList<Object>();  
64.   
65. //遇到非JSON格式输入的时候的处理。  
66. private boolean ignoreInvalidInput;  
67.   
68. /**
69.      * An initialization function used to gather information about the table.
70.      * Typically, a SerDe implementation will be interested in the list of
71.      * column names and their types. That information will be used to help
72.      * perform actual serialization and deserialization of data.
73.      */  
74. @Override  
75. public void initialize(Configuration conf, Properties tbl)  
76. throws SerDeException {  
77. // 遇到无法转换成JSON对象的字符串时,是否忽略,默认不忽略,抛出异常,设置为true将跳过异常。  
78.         ignoreInvalidInput = Boolean.valueOf(tbl.getProperty(  
79. "input.invalid.ignore", "false"));  
80.   
81. // Get a list of the table's column names.  
82.   
83.         String colNamesStr = tbl.getProperty(serdeConstants.LIST_COLUMNS);  
84. ","));  
85.   
86. // Get a list of TypeInfos for the columns. This list lines up with  
87. // the list of column names.  
88.         String colTypesStr = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);  
89.         List<TypeInfo> colTypes = TypeInfoUtils  
90.                 .getTypeInfosFromTypeString(colTypesStr);  
91.   
92.         rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(  
93.                 colNames, colTypes);  
94.         rowOI = TypeInfoUtils  
95.                 .getStandardJavaObjectInspectorFromTypeInfo(rowTypeInfo);  
96.     }  
97.   
98. /**
99.      * This method does the work of deserializing a record into Java objects
100.      * that Hive can work with via the ObjectInspector interface. For this
101.      * SerDe, the blob that is passed in is a JSON string, and the Jackson JSON
102.      * parser is being used to translate the string into Java objects.
103.      * 
104.      * The JSON deserialization works by taking the column names in the Hive
105.      * table, and looking up those fields in the parsed JSON object. If the
106.      * value of the field is not a primitive, the object is parsed further.
107.      */  
108. @Override  
109. public Object deserialize(Writable blob) throws SerDeException {  
110. null;  
111.         row.clear();  
112. try {  
113. new ObjectMapper();  
114. // This is really a Map<String, Object>. For more information about  
115. // how  
116. // Jackson parses JSON in this example, see  
117. // http://wiki.fasterxml.com/JacksonDataBinding  
118. class);  
119. catch (Exception e) {  
120. // 如果为true,不抛出异常,忽略该行数据  
121. if (!ignoreInvalidInput)  
122. throw new SerDeException(e);  
123. else {  
124. return null;  
125.             }  
126.               
127.         }  
128.   
129. // Lowercase the keys as expected by hive  
130. new HashMap();  
131. for (Map.Entry entry : root.entrySet()) {  
132.             lowerRoot.put(((String) entry.getKey()).toLowerCase(),  
133.                     entry.getValue());  
134.         }  
135.         root = lowerRoot;  
136.   
137. null;  
138. for (String fieldName : rowTypeInfo.getAllStructFieldNames()) {  
139. try {  
140.                 TypeInfo fieldTypeInfo = rowTypeInfo  
141.                         .getStructFieldTypeInfo(fieldName);  
142.                 value = parseField(root.get(fieldName), fieldTypeInfo);  
143. catch (Exception e) {  
144. null;  
145.             }  
146.             row.add(value);  
147.         }  
148. return row;  
149.     }  
150.   
151. /**
152.      * Parses a JSON object according to the Hive column's type.
153.      * 
154.      * @param field
155.      *            - The JSON object to parse
156.      * @param fieldTypeInfo
157.      *            - Metadata about the Hive column
158.      * @return - The parsed value of the field
159.      */  
160. private Object parseField(Object field, TypeInfo fieldTypeInfo) {  
161. switch (fieldTypeInfo.getCategory()) {  
162. case PRIMITIVE:  
163. // Jackson will return the right thing in this case, so just return  
164. // the object  
165. if (field instanceof String) {  
166. "\n", "\\\\n");  
167.             }  
168. return field;  
169. case LIST:  
170. return parseList(field, (ListTypeInfo) fieldTypeInfo);  
171. case MAP:  
172. return parseMap(field, (MapTypeInfo) fieldTypeInfo);  
173. case STRUCT:  
174. return parseStruct(field, (StructTypeInfo) fieldTypeInfo);  
175. case UNION:  
176. // Unsupported by JSON  
177. default:  
178. return null;  
179.         }  
180.     }  
181.   
182. /**
183.      * Parses a JSON object and its fields. The Hive metadata is used to
184.      * determine how to parse the object fields.
185.      * 
186.      * @param field
187.      *            - The JSON object to parse
188.      * @param fieldTypeInfo
189.      *            - Metadata about the Hive column
190.      * @return - A map representing the object and its fields
191.      */  
192. private Object parseStruct(Object field, StructTypeInfo fieldTypeInfo) {  
193.         Map<Object, Object> map = (Map<Object, Object>) field;  
194.         ArrayList<TypeInfo> structTypes = fieldTypeInfo  
195.                 .getAllStructFieldTypeInfos();  
196.         ArrayList<String> structNames = fieldTypeInfo.getAllStructFieldNames();  
197.   
198. new ArrayList<Object>(structTypes.size());  
199. for (int i = 0; i < structNames.size(); i++) {  
200.             structRow.add(parseField(map.get(structNames.get(i)),  
201.                     structTypes.get(i)));  
202.         }  
203. return structRow;  
204.     }  
205.   
206. /**
207.      * Parse a JSON list and its elements. This uses the Hive metadata for the
208.      * list elements to determine how to parse the elements.
209.      * 
210.      * @param field
211.      *            - The JSON list to parse
212.      * @param fieldTypeInfo
213.      *            - Metadata about the Hive column
214.      * @return - A list of the parsed elements
215.      */  
216. private Object parseList(Object field, ListTypeInfo fieldTypeInfo) {  
217.         ArrayList<Object> list = (ArrayList<Object>) field;  
218.         TypeInfo elemTypeInfo = fieldTypeInfo.getListElementTypeInfo();  
219.   
220. for (int i = 0; i < list.size(); i++) {  
221.             list.set(i, parseField(list.get(i), elemTypeInfo));  
222.         }  
223.   
224. return list.toArray();  
225.     }  
226.   
227. /**
228.      * Parse a JSON object as a map. This uses the Hive metadata for the map
229.      * values to determine how to parse the values. The map is assumed to have a
230.      * string for a key.
231.      * 
232.      * @param field
233.      *            - The JSON list to parse
234.      * @param fieldTypeInfo
235.      *            - Metadata about the Hive column
236.      * @return
237.      */  
238. private Object parseMap(Object field, MapTypeInfo fieldTypeInfo) {  
239.         Map<Object, Object> map = (Map<Object, Object>) field;  
240.         TypeInfo valueTypeInfo = fieldTypeInfo.getMapValueTypeInfo();  
241.   
242. for (Map.Entry<Object, Object> entry : map.entrySet()) {  
243.             map.put(entry.getKey(), parseField(entry.getValue(), valueTypeInfo));  
244.         }  
245. return map;  
246.     }  
247.   
248. /**
249.      * Return an ObjectInspector for the row of data
250.      */  
251. @Override  
252. public ObjectInspector getObjectInspector() throws SerDeException {  
253. return rowOI;  
254.     }  
255.   
256. /**
257.      * Unimplemented
258.      */  
259. @Override  
260. public SerDeStats getSerDeStats() {  
261. return null;  
262.     }  
263.   
264. /**
265.      * JSON is just a textual representation, so our serialized class is just
266.      * Text.
267.      */  
268. @Override  
269. public Class<? extends Writable> getSerializedClass() {  
270. return Text.class;  
271.     }  
272.   
273. /**
274.      * This method takes an object representing a row of data from Hive, and
275.      * uses the ObjectInspector to get the data for each column and serialize
276.      * it. This implementation deparses the row into an object that Jackson can
277.      * easily serialize into a JSON blob.
278.      */  
279. @Override  
280. public Writable serialize(Object obj, ObjectInspector oi)  
281. throws SerDeException {  
282.         Object deparsedObj = deparseRow(obj, oi);  
283. new ObjectMapper();  
284. try {  
285. // Let Jackson do the work of serializing the object  
286. return new Text(mapper.writeValueAsString(deparsedObj));  
287. catch (Exception e) {  
288. throw new SerDeException(e);  
289.         }  
290.     }  
291.   
292. /**
293.      * Deparse a Hive object into a Jackson-serializable object. This uses the
294.      * ObjectInspector to extract the column data.
295.      * 
296.      * @param obj
297.      *            - Hive object to deparse
298.      * @param oi
299.      *            - ObjectInspector for the object
300.      * @return - A deparsed object
301.      */  
302. private Object deparseObject(Object obj, ObjectInspector oi) {  
303. switch (oi.getCategory()) {  
304. case LIST:  
305. return deparseList(obj, (ListObjectInspector) oi);  
306. case MAP:  
307. return deparseMap(obj, (MapObjectInspector) oi);  
308. case PRIMITIVE:  
309. return deparsePrimitive(obj, (PrimitiveObjectInspector) oi);  
310. case STRUCT:  
311. return deparseStruct(obj, (StructObjectInspector) oi, false);  
312. case UNION:  
313. // Unsupported by JSON  
314. default:  
315. return null;  
316.         }  
317.     }  
318.   
319. /**
320.      * Deparses a row of data. We have to treat this one differently from other
321.      * structs, because the field names for the root object do not match the
322.      * column names for the Hive table.
323.      * 
324.      * @param obj
325.      *            - Object representing the top-level row
326.      * @param structOI
327.      *            - ObjectInspector for the row
328.      * @return - A deparsed row of data
329.      */  
330. private Object deparseRow(Object obj, ObjectInspector structOI) {  
331. return deparseStruct(obj, (StructObjectInspector) structOI, true);  
332.     }  
333.   
334. /**
335.      * Deparses struct data into a serializable JSON object.
336.      * 
337.      * @param obj
338.      *            - Hive struct data
339.      * @param structOI
340.      *            - ObjectInspector for the struct
341.      * @param isRow
342.      *            - Whether or not this struct represents a top-level row
343.      * @return - A deparsed struct
344.      */  
345. private Object deparseStruct(Object obj, StructObjectInspector structOI,  
346. boolean isRow) {  
347. new HashMap<Object, Object>();  
348. extends StructField> fields = structOI.getAllStructFieldRefs();  
349. for (int i = 0; i < fields.size(); i++) {  
350.             StructField field = fields.get(i);  
351. // The top-level row object is treated slightly differently from  
352. // other  
353. // structs, because the field names for the row do not correctly  
354. // reflect  
355. // the Hive column names. For lower-level structs, we can get the  
356. // field  
357. // name from the associated StructField object.  
358.             String fieldName = isRow ? colNames.get(i) : field.getFieldName();  
359.             ObjectInspector fieldOI = field.getFieldObjectInspector();  
360.             Object fieldObj = structOI.getStructFieldData(obj, field);  
361.             struct.put(fieldName, deparseObject(fieldObj, fieldOI));  
362.         }  
363. return struct;  
364.     }  
365.   
366. /**
367.      * Deparses a primitive type.
368.      * 
369.      * @param obj
370.      *            - Hive object to deparse
371.      * @param oi
372.      *            - ObjectInspector for the object
373.      * @return - A deparsed object
374.      */  
375. private Object deparsePrimitive(Object obj, PrimitiveObjectInspector primOI) {  
376. return primOI.getPrimitiveJavaObject(obj);  
377.     }  
378.   
379. private Object deparseMap(Object obj, MapObjectInspector mapOI) {  
380. new HashMap<Object, Object>();  
381.         ObjectInspector mapValOI = mapOI.getMapValueObjectInspector();  
382.         Map<?, ?> fields = mapOI.getMap(obj);  
383. for (Map.Entry<?, ?> field : fields.entrySet()) {  
384.             Object fieldName = field.getKey();  
385.             Object fieldObj = field.getValue();  
386.             map.put(fieldName, deparseObject(fieldObj, mapValOI));  
387.         }  
388. return map;  
389.     }  
390.   
391. /**
392.      * Deparses a list and its elements.
393.      * 
394.      * @param obj
395.      *            - Hive object to deparse
396.      * @param oi
397.      *            - ObjectInspector for the object
398.      * @return - A deparsed object
399.      */  
400. private Object deparseList(Object obj, ListObjectInspector listOI) {  
401. new ArrayList<Object>();  
402.         List<?> field = listOI.getList(obj);  
403.         ObjectInspector elemOI = listOI.getListElementObjectInspector();  
404. for (Object elem : field) {  
405.             list.add(deparseObject(elem, elemOI));  
406.         }  
407. return list;  
408.     }  
409. }

我稍微修改了一点东西,多加了一个参数input.invalid.ignore,对应的变量为:


//遇到非JSON格式输入的时候的处理。
private boolean ignoreInvalidInput;


在deserialize方法中原来是如果传入的是非JSON格式字符串的话,直接抛出了SerDeException,我加了一个参数来控制它是否抛出异常,在initialize方法中初始化这个变量(默认为false):


// 遇到无法转换成JSON对象的字符串时,是否忽略,默认不忽略,抛出异常,设置为true将跳过异常。
ignoreInvalidInput = Boolean.valueOf(tbl.getProperty(
"input.invalid.ignore", "false"));


好的,现在将这个类打成JAR包: JSONSerDe.jar,放在hive_home的auxlib目录下(我的是/etc/hive/auxlib),然后修改hive-env.sh,添加HIVE_AUX_JARS_PATH=/etc/hive/auxlib/JSONSerDe.jar,这样每次运行hive客户端的时候都会将这个jar包添加到classpath,否则在设置SERDE的时候会报找不到类。

现在我们在HIVE中创建一张表用来存放日志数据:


[plain]  
    view plain 
    copy 
    
 
    
 
  
1. create table test(  
2. requestTime BIGINT,  
3. requestParams STRUCT<timestamp:BIGINT,phone:STRING,cardName:STRING,provinceCode:STRING,cityCode:STRING>,    
4. requestUrl STRING)  
5.  row format serde "com.besttone.hive.serde.JSONSerDe"   
6.  WITH SERDEPROPERTIES(  
7.  "input.invalid.ignore"="true",  
8.  "requestTime"="$.requestTime",  
9.  "requestParams.timestamp"="$.requestParams.timestamp",  
10.  "requestParams.phone"="$.requestParams.phone",  
11.  "requestParams.cardName"="$.requestParams.cardName",  
12.  "requestParams.provinceCode"="$.requestParams.provinceCode",  
13.  "requestParams.cityCode"="$.requestParams.cityCode",  
14.  "requestUrl"="$.requestUrl");



这个表结构就是按照日志格式设计的,还记得前面说过的日志数据如下:


{"requestTime":1405651379758,"requestParams":{"timestamp":1405651377211,"phone":"02038824941","cardName":"测试商家名称","provinceCode":"440000","cityCode":"440106"},"requestUrl":"/reporter-api/reporter/reporter12/init.do"}

我使用了一个STRUCT类型来保存requestParams的值,row format我们用的是自定义的json serde:com.besttone.hive.serde.JSONSerDe,SERDEPROPERTIES中,除了设置JSON对象的映射关系外,我还设置了一个自定义的参数:"input.invalid.ignore"="true",忽略掉所有非JSON格式的输入行。这里不是真正意义的忽略,只是非法行的每个输出字段都为NULL了,要在结果集上忽略,必须这样写:select * from test where requestUrl is not null;

OK表建好了,现在就差数据了,我们启动flumedemo的WriteLog,往hive表test目录下面输出一些日志数据,然后在进入hive客户端,select * from test;所以字段都正确的解析,大功告成。

flume.conf如下:

[plain]  
    view plain 
    copy 
    
 
    
 
  
1. tier1.sources=source1  
2. tier1.channels=channel1  
3. tier1.sinks=sink1  
4.   
5. tier1.sources.source1.type=avro  
6. tier1.sources.source1.bind=0.0.0.0  
7. tier1.sources.source1.port=44444  
8. tier1.sources.source1.channels=channel1  
9.   
10. tier1.sources.source1.interceptors=i1 i2  
11. tier1.sources.source1.interceptors.i1.type=regex_filter  
12. tier1.sources.source1.interceptors.i1.regex=\\{.*\\}  
13. tier1.sources.source1.interceptors.i2.type=timestamp  
14.   
15. tier1.channels.channel1.type=memory  
16. tier1.channels.channel1.capacity=10000  
17. tier1.channels.channel1.transactionCapacity=1000  
18. tier1.channels.channel1.keep-alive=30  
19.   
20. tier1.sinks.sink1.type=hdfs  
21. tier1.sinks.sink1.channel=channel1  
22. tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/user/hive/warehouse/besttone.db/test  
23. tier1.sinks.sink1.hdfs.fileType=DataStream  
24. tier1.sinks.sink1.hdfs.writeFormat=Text  
25. tier1.sinks.sink1.hdfs.rollInterval=0  
26. tier1.sinks.sink1.hdfs.rollSize=10240  
27. tier1.sinks.sink1.hdfs.rollCount=0  
28. tier1.sinks.sink1.hdfs.idleTimeout=60


besttone.db是我在hive中创建的数据库,了解hive的应该理解没多大问题。



OK,到这篇文章为止,整个从LOG4J生产日志,到flume收集日志,再到用hive离线分析日志,一整套流水线都讲解完了。


如果想用hive来分析采集来的日志,我们可以将/flume/events下面的日志数据都load到hive中的表当中去。


如果了解hive的load data原理的话,还有一种更简便的方式,可以省去load data这一步,就是直接将sink1.hdfs.path指定为hive表的目录。

下面我将详细描述具体的操作步骤。

我们还是从需求驱动来讲解,前面我们采集的数据,都是接口的访问日志数据,数据格式是JSON格式如下:

{"requestTime":1405651379758,"requestParams":{"timestamp":1405651377211,"phone":"02038824941","cardName":"测试商家名称","provinceCode":"440000","cityCode":"440106"},"requestUrl":"/reporter-api/reporter/reporter12/init.do"}


现在有一个需求,我们要统计接口的总调用量。

我第一想法就是,hive中建一张表:test             然后将hdfs.path指定为tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/user/hive/warehouse/besttone.db/test

然后select  count(*) from test;   完事。

这个方案简单,粗暴,先这么干着。于是会遇到一个问题,我的日志数据时JSON格式的,需要hive来序列化和反序列化JSON格式的数据到test表的具体字段当中去。

这有点糟糕,因为hive本身没有提供JSON的SERDE,但是有提供函数来解析JSON字符串,

第一个是(UDF):

 get_json_object(string json_string,string path) 从给定路径上的JSON字符串中抽取出JSON对象,并返回这个对象的JSON字符串形式,如果输入的JSON字符串是非法的,则返回NULL。

第二个是表生成函数(UDTF):json_tuple(string jsonstr,p1,p2,...,pn) 本函数可以接受多个标签名称,对输入的JSON字符串进行处理,这个和get_json_object这个UDF类似,不过更高效,其通过一次调用就可以获得多个键值,例:select b.* from test_json a lateral view json_tuple(a.id,'id','name') b as f1,f2;通过lateral view行转列。


最理想的方式就是能有一种JSON SERDE,只要我们LOAD完数据,就直接可以select * from test,而不是select get_json_object这种方式来获取,N个字段就要解析N次,效率太低了。

好在cloudrea wiki里提供了一个json serde类(这个类没有在发行的hive的jar包中),于是我把它搬来了,如下:



[java]  
    view plain 
    copy 
    
 
    
 
  
1. package com.besttone.hive.serde;  
2.   
3. import java.util.ArrayList;  
4. import java.util.Arrays;  
5. import java.util.HashMap;  
6. import java.util.List;  
7. import java.util.Map;  
8. import java.util.Properties;  
9.   
10. import org.apache.hadoop.conf.Configuration;  
11. import org.apache.hadoop.hive.serde.serdeConstants;  
12. import org.apache.hadoop.hive.serde2.SerDe;  
13. import org.apache.hadoop.hive.serde2.SerDeException;  
14. import org.apache.hadoop.hive.serde2.SerDeStats;  
15. import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;  
16. import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;  
17. import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;  
18. import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;  
19. import org.apache.hadoop.hive.serde2.objectinspector.StructField;  
20. import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;  
21. import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;  
22. import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;  
23. import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;  
24. import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;  
25. import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;  
26. import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;  
27. import org.apache.hadoop.io.Text;  
28. import org.apache.hadoop.io.Writable;  
29. import org.codehaus.jackson.map.ObjectMapper;  
30.   
31. /**
32.  * This SerDe can be used for processing JSON data in Hive. It supports
33.  * arbitrary JSON data, and can handle all Hive types except for UNION. However,
34.  * the JSON data is expected to be a series of discrete records, rather than a
35.  * JSON array of objects.
36.  * 
37.  * The Hive table is expected to contain columns with names corresponding to
38.  * fields in the JSON data, but it is not necessary for every JSON field to have
39.  * a corresponding Hive column. Those JSON fields will be ignored during
40.  * queries.
41.  * 
42.  * Example:
43.  * 
44.  * { "a": 1, "b": [ "str1", "str2" ], "c": { "field1": "val1" } }
45.  * 
46.  * Could correspond to a table:
47.  * 
48.  * CREATE TABLE foo (a INT, b ARRAY<STRING>, c STRUCT<field1:STRING>);
49.  * 
50.  * JSON objects can also interpreted as a Hive MAP type, so long as the keys and
51.  * values in the JSON object are all of the appropriate types. For example, in
52.  * the JSON above, another valid table declaraction would be:
53.  * 
54.  * CREATE TABLE foo (a INT, b ARRAY<STRING>, c MAP<STRING,STRING>);
55.  * 
56.  * Only STRING keys are supported for Hive MAPs.
57.  */  
58. public class JSONSerDe implements SerDe {  
59.   
60. private StructTypeInfo rowTypeInfo;  
61. private ObjectInspector rowOI;  
62. private List<String> colNames;  
63. private List<Object> row = new ArrayList<Object>();  
64.   
65. //遇到非JSON格式输入的时候的处理。  
66. private boolean ignoreInvalidInput;  
67.   
68. /**
69.      * An initialization function used to gather information about the table.
70.      * Typically, a SerDe implementation will be interested in the list of
71.      * column names and their types. That information will be used to help
72.      * perform actual serialization and deserialization of data.
73.      */  
74. @Override  
75. public void initialize(Configuration conf, Properties tbl)  
76. throws SerDeException {  
77. // 遇到无法转换成JSON对象的字符串时,是否忽略,默认不忽略,抛出异常,设置为true将跳过异常。  
78.         ignoreInvalidInput = Boolean.valueOf(tbl.getProperty(  
79. "input.invalid.ignore", "false"));  
80.   
81. // Get a list of the table's column names.  
82.   
83.         String colNamesStr = tbl.getProperty(serdeConstants.LIST_COLUMNS);  
84. ","));  
85.   
86. // Get a list of TypeInfos for the columns. This list lines up with  
87. // the list of column names.  
88.         String colTypesStr = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);  
89.         List<TypeInfo> colTypes = TypeInfoUtils  
90.                 .getTypeInfosFromTypeString(colTypesStr);  
91.   
92.         rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(  
93.                 colNames, colTypes);  
94.         rowOI = TypeInfoUtils  
95.                 .getStandardJavaObjectInspectorFromTypeInfo(rowTypeInfo);  
96.     }  
97.   
98. /**
99.      * This method does the work of deserializing a record into Java objects
100.      * that Hive can work with via the ObjectInspector interface. For this
101.      * SerDe, the blob that is passed in is a JSON string, and the Jackson JSON
102.      * parser is being used to translate the string into Java objects.
103.      * 
104.      * The JSON deserialization works by taking the column names in the Hive
105.      * table, and looking up those fields in the parsed JSON object. If the
106.      * value of the field is not a primitive, the object is parsed further.
107.      */  
108. @Override  
109. public Object deserialize(Writable blob) throws SerDeException {  
110. null;  
111.         row.clear();  
112. try {  
113. new ObjectMapper();  
114. // This is really a Map<String, Object>. For more information about  
115. // how  
116. // Jackson parses JSON in this example, see  
117. // http://wiki.fasterxml.com/JacksonDataBinding  
118. class);  
119. catch (Exception e) {  
120. // 如果为true,不抛出异常,忽略该行数据  
121. if (!ignoreInvalidInput)  
122. throw new SerDeException(e);  
123. else {  
124. return null;  
125.             }  
126.               
127.         }  
128.   
129. // Lowercase the keys as expected by hive  
130. new HashMap();  
131. for (Map.Entry entry : root.entrySet()) {  
132.             lowerRoot.put(((String) entry.getKey()).toLowerCase(),  
133.                     entry.getValue());  
134.         }  
135.         root = lowerRoot;  
136.   
137. null;  
138. for (String fieldName : rowTypeInfo.getAllStructFieldNames()) {  
139. try {  
140.                 TypeInfo fieldTypeInfo = rowTypeInfo  
141.                         .getStructFieldTypeInfo(fieldName);  
142.                 value = parseField(root.get(fieldName), fieldTypeInfo);  
143. catch (Exception e) {  
144. null;  
145.             }  
146.             row.add(value);  
147.         }  
148. return row;  
149.     }  
150.   
151. /**
152.      * Parses a JSON object according to the Hive column's type.
153.      * 
154.      * @param field
155.      *            - The JSON object to parse
156.      * @param fieldTypeInfo
157.      *            - Metadata about the Hive column
158.      * @return - The parsed value of the field
159.      */  
160. private Object parseField(Object field, TypeInfo fieldTypeInfo) {  
161. switch (fieldTypeInfo.getCategory()) {  
162. case PRIMITIVE:  
163. // Jackson will return the right thing in this case, so just return  
164. // the object  
165. if (field instanceof String) {  
166. "\n", "\\\\n");  
167.             }  
168. return field;  
169. case LIST:  
170. return parseList(field, (ListTypeInfo) fieldTypeInfo);  
171. case MAP:  
172. return parseMap(field, (MapTypeInfo) fieldTypeInfo);  
173. case STRUCT:  
174. return parseStruct(field, (StructTypeInfo) fieldTypeInfo);  
175. case UNION:  
176. // Unsupported by JSON  
177. default:  
178. return null;  
179.         }  
180.     }  
181.   
182. /**
183.      * Parses a JSON object and its fields. The Hive metadata is used to
184.      * determine how to parse the object fields.
185.      * 
186.      * @param field
187.      *            - The JSON object to parse
188.      * @param fieldTypeInfo
189.      *            - Metadata about the Hive column
190.      * @return - A map representing the object and its fields
191.      */  
192. private Object parseStruct(Object field, StructTypeInfo fieldTypeInfo) {  
193.         Map<Object, Object> map = (Map<Object, Object>) field;  
194.         ArrayList<TypeInfo> structTypes = fieldTypeInfo  
195.                 .getAllStructFieldTypeInfos();  
196.         ArrayList<String> structNames = fieldTypeInfo.getAllStructFieldNames();  
197.   
198. new ArrayList<Object>(structTypes.size());  
199. for (int i = 0; i < structNames.size(); i++) {  
200.             structRow.add(parseField(map.get(structNames.get(i)),  
201.                     structTypes.get(i)));  
202.         }  
203. return structRow;  
204.     }  
205.   
206. /**
207.      * Parse a JSON list and its elements. This uses the Hive metadata for the
208.      * list elements to determine how to parse the elements.
209.      * 
210.      * @param field
211.      *            - The JSON list to parse
212.      * @param fieldTypeInfo
213.      *            - Metadata about the Hive column
214.      * @return - A list of the parsed elements
215.      */  
216. private Object parseList(Object field, ListTypeInfo fieldTypeInfo) {  
217.         ArrayList<Object> list = (ArrayList<Object>) field;  
218.         TypeInfo elemTypeInfo = fieldTypeInfo.getListElementTypeInfo();  
219.   
220. for (int i = 0; i < list.size(); i++) {  
221.             list.set(i, parseField(list.get(i), elemTypeInfo));  
222.         }  
223.   
224. return list.toArray();  
225.     }  
226.   
227. /**
228.      * Parse a JSON object as a map. This uses the Hive metadata for the map
229.      * values to determine how to parse the values. The map is assumed to have a
230.      * string for a key.
231.      * 
232.      * @param field
233.      *            - The JSON list to parse
234.      * @param fieldTypeInfo
235.      *            - Metadata about the Hive column
236.      * @return
237.      */  
238. private Object parseMap(Object field, MapTypeInfo fieldTypeInfo) {  
239.         Map<Object, Object> map = (Map<Object, Object>) field;  
240.         TypeInfo valueTypeInfo = fieldTypeInfo.getMapValueTypeInfo();  
241.   
242. for (Map.Entry<Object, Object> entry : map.entrySet()) {  
243.             map.put(entry.getKey(), parseField(entry.getValue(), valueTypeInfo));  
244.         }  
245. return map;  
246.     }  
247.   
248. /**
249.      * Return an ObjectInspector for the row of data
250.      */  
251. @Override  
252. public ObjectInspector getObjectInspector() throws SerDeException {  
253. return rowOI;  
254.     }  
255.   
256. /**
257.      * Unimplemented
258.      */  
259. @Override  
260. public SerDeStats getSerDeStats() {  
261. return null;  
262.     }  
263.   
264. /**
265.      * JSON is just a textual representation, so our serialized class is just
266.      * Text.
267.      */  
268. @Override  
269. public Class<? extends Writable> getSerializedClass() {  
270. return Text.class;  
271.     }  
272.   
273. /**
274.      * This method takes an object representing a row of data from Hive, and
275.      * uses the ObjectInspector to get the data for each column and serialize
276.      * it. This implementation deparses the row into an object that Jackson can
277.      * easily serialize into a JSON blob.
278.      */  
279. @Override  
280. public Writable serialize(Object obj, ObjectInspector oi)  
281. throws SerDeException {  
282.         Object deparsedObj = deparseRow(obj, oi);  
283. new ObjectMapper();  
284. try {  
285. // Let Jackson do the work of serializing the object  
286. return new Text(mapper.writeValueAsString(deparsedObj));  
287. catch (Exception e) {  
288. throw new SerDeException(e);  
289.         }  
290.     }  
291.   
292. /**
293.      * Deparse a Hive object into a Jackson-serializable object. This uses the
294.      * ObjectInspector to extract the column data.
295.      * 
296.      * @param obj
297.      *            - Hive object to deparse
298.      * @param oi
299.      *            - ObjectInspector for the object
300.      * @return - A deparsed object
301.      */  
302. private Object deparseObject(Object obj, ObjectInspector oi) {  
303. switch (oi.getCategory()) {  
304. case LIST:  
305. return deparseList(obj, (ListObjectInspector) oi);  
306. case MAP:  
307. return deparseMap(obj, (MapObjectInspector) oi);  
308. case PRIMITIVE:  
309. return deparsePrimitive(obj, (PrimitiveObjectInspector) oi);  
310. case STRUCT:  
311. return deparseStruct(obj, (StructObjectInspector) oi, false);  
312. case UNION:  
313. // Unsupported by JSON  
314. default:  
315. return null;  
316.         }  
317.     }  
318.   
319. /**
320.      * Deparses a row of data. We have to treat this one differently from other
321.      * structs, because the field names for the root object do not match the
322.      * column names for the Hive table.
323.      * 
324.      * @param obj
325.      *            - Object representing the top-level row
326.      * @param structOI
327.      *            - ObjectInspector for the row
328.      * @return - A deparsed row of data
329.      */  
330. private Object deparseRow(Object obj, ObjectInspector structOI) {  
331. return deparseStruct(obj, (StructObjectInspector) structOI, true);  
332.     }  
333.   
334. /**
335.      * Deparses struct data into a serializable JSON object.
336.      * 
337.      * @param obj
338.      *            - Hive struct data
339.      * @param structOI
340.      *            - ObjectInspector for the struct
341.      * @param isRow
342.      *            - Whether or not this struct represents a top-level row
343.      * @return - A deparsed struct
344.      */  
345. private Object deparseStruct(Object obj, StructObjectInspector structOI,  
346. boolean isRow) {  
347. new HashMap<Object, Object>();  
348. extends StructField> fields = structOI.getAllStructFieldRefs();  
349. for (int i = 0; i < fields.size(); i++) {  
350.             StructField field = fields.get(i);  
351. // The top-level row object is treated slightly differently from  
352. // other  
353. // structs, because the field names for the row do not correctly  
354. // reflect  
355. // the Hive column names. For lower-level structs, we can get the  
356. // field  
357. // name from the associated StructField object.  
358.             String fieldName = isRow ? colNames.get(i) : field.getFieldName();  
359.             ObjectInspector fieldOI = field.getFieldObjectInspector();  
360.             Object fieldObj = structOI.getStructFieldData(obj, field);  
361.             struct.put(fieldName, deparseObject(fieldObj, fieldOI));  
362.         }  
363. return struct;  
364.     }  
365.   
366. /**
367.      * Deparses a primitive type.
368.      * 
369.      * @param obj
370.      *            - Hive object to deparse
371.      * @param oi
372.      *            - ObjectInspector for the object
373.      * @return - A deparsed object
374.      */  
375. private Object deparsePrimitive(Object obj, PrimitiveObjectInspector primOI) {  
376. return primOI.getPrimitiveJavaObject(obj);  
377.     }  
378.   
379. private Object deparseMap(Object obj, MapObjectInspector mapOI) {  
380. new HashMap<Object, Object>();  
381.         ObjectInspector mapValOI = mapOI.getMapValueObjectInspector();  
382.         Map<?, ?> fields = mapOI.getMap(obj);  
383. for (Map.Entry<?, ?> field : fields.entrySet()) {  
384.             Object fieldName = field.getKey();  
385.             Object fieldObj = field.getValue();  
386.             map.put(fieldName, deparseObject(fieldObj, mapValOI));  
387.         }  
388. return map;  
389.     }  
390.   
391. /**
392.      * Deparses a list and its elements.
393.      * 
394.      * @param obj
395.      *            - Hive object to deparse
396.      * @param oi
397.      *            - ObjectInspector for the object
398.      * @return - A deparsed object
399.      */  
400. private Object deparseList(Object obj, ListObjectInspector listOI) {  
401. new ArrayList<Object>();  
402.         List<?> field = listOI.getList(obj);  
403.         ObjectInspector elemOI = listOI.getListElementObjectInspector();  
404. for (Object elem : field) {  
405.             list.add(deparseObject(elem, elemOI));  
406.         }  
407. return list;  
408.     }  
409. }


我稍微修改了一点东西,多加了一个参数input.invalid.ignore,对应的变量为:


//遇到非JSON格式输入的时候的处理。
private boolean ignoreInvalidInput;


在deserialize方法中原来是如果传入的是非JSON格式字符串的话,直接抛出了SerDeException,我加了一个参数来控制它是否抛出异常,在initialize方法中初始化这个变量(默认为false):


// 遇到无法转换成JSON对象的字符串时,是否忽略,默认不忽略,抛出异常,设置为true将跳过异常。
ignoreInvalidInput = Boolean.valueOf(tbl.getProperty(
"input.invalid.ignore", "false"));

好的,现在将这个类打成JAR包: JSONSerDe.jar,放在hive_home的auxlib目录下(我的是/etc/hive/auxlib),然后修改hive-env.sh,添加HIVE_AUX_JARS_PATH=/etc/hive/auxlib/JSONSerDe.jar,这样每次运行hive客户端的时候都会将这个jar包添加到classpath,否则在设置SERDE的时候会报找不到类。

现在我们在HIVE中创建一张表用来存放日志数据:



[plain]  view plain copy


1. create table test(  
2. requestTime BIGINT,  
3. requestParams STRUCT<timestamp:BIGINT,phone:STRING,cardName:STRING,provinceCode:STRING,cityCode:STRING>,    
4. requestUrl STRING)  
5.  row format serde "com.besttone.hive.serde.JSONSerDe"   
6.  WITH SERDEPROPERTIES(  
7.  "input.invalid.ignore"="true",  
8.  "requestTime"="$.requestTime",  
9.  "requestParams.timestamp"="$.requestParams.timestamp",  
10.  "requestParams.phone"="$.requestParams.phone",  
11.  "requestParams.cardName"="$.requestParams.cardName",  
12.  "requestParams.provinceCode"="$.requestParams.provinceCode",  
13.  "requestParams.cityCode"="$.requestParams.cityCode",  
14.  "requestUrl"="$.requestUrl");


这个表结构就是按照日志格式设计的,还记得前面说过的日志数据如下:


{"requestTime":1405651379758,"requestParams":{"timestamp":1405651377211,"phone":"02038824941","cardName":"测试商家名称","provinceCode":"440000","cityCode":"440106"},"requestUrl":"/reporter-api/reporter/reporter12/init.do"}


我使用了一个STRUCT类型来保存requestParams的值,row format我们用的是自定义的json serde:com.besttone.hive.serde.JSONSerDe,SERDEPROPERTIES中,除了设置JSON对象的映射关系外,我还设置了一个自定义的参数:"input.invalid.ignore"="true",忽略掉所有非JSON格式的输入行。这里不是真正意义的忽略,只是非法行的每个输出字段都为NULL了,要在结果集上忽略,必须这样写:select * from test where requestUrl is not null;

OK表建好了,现在就差数据了,我们启动flumedemo的WriteLog,往hive表test目录下面输出一些日志数据,然后在进入hive客户端,select * from test;所以字段都正确的解析,大功告成。

flume.conf如下:



[plain]  
    view plain 
    copy 
    
 
    
 
  
1. tier1.sources=source1  
2. tier1.channels=channel1  
3. tier1.sinks=sink1  
4.   
5. tier1.sources.source1.type=avro  
6. tier1.sources.source1.bind=0.0.0.0  
7. tier1.sources.source1.port=44444  
8. tier1.sources.source1.channels=channel1  
9.   
10. tier1.sources.source1.interceptors=i1 i2  
11. tier1.sources.source1.interceptors.i1.type=regex_filter  
12. tier1.sources.source1.interceptors.i1.regex=\\{.*\\}  
13. tier1.sources.source1.interceptors.i2.type=timestamp  
14.   
15. tier1.channels.channel1.type=memory  
16. tier1.channels.channel1.capacity=10000  
17. tier1.channels.channel1.transactionCapacity=1000  
18. tier1.channels.channel1.keep-alive=30  
19.   
20. tier1.sinks.sink1.type=hdfs  
21. tier1.sinks.sink1.channel=channel1  
22. tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/user/hive/warehouse/besttone.db/test  
23. tier1.sinks.sink1.hdfs.fileType=DataStream  
24. tier1.sinks.sink1.hdfs.writeFormat=Text  
25. tier1.sinks.sink1.hdfs.rollInterval=0  
26. tier1.sinks.sink1.hdfs.rollSize=10240  
27. tier1.sinks.sink1.hdfs.rollCount=0  
28. tier1.sinks.sink1.hdfs.idleTimeout=60



besttone.db是我在hive中创建的数据库,了解hive的应该理解没多大问题。



OK,到这篇文章为止,整个从LOG4J生产日志,到flume收集日志,再到用hive离线分析日志,一整套流水线都讲解完了。

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
  jnZtF7Co41Wg   2023年11月22日   22   0   0 linuxApacheci
  9JCEeX0Eg8g4   2023年12月10日   30   0   0 应用程序javaApache
  KRsXEGSB49bk   2023年11月27日   28   0   0 javaApache
  jnZtF7Co41Wg   2023年11月24日   28   0   0 mysqlApachecentos
  xwGmYGXf1w4S   2023年11月22日   42   0   0 tomcatjavaApache