大数据持久化 by gora
  5HAFzPFxGStu 2023年11月02日 33 0


gora-demo托管于github

 

wget http://mirrors.cnnic.cn/apache/gora/0.3/apache-gora-0.3-src.zip

unzip apache-gora-0.3-src.zip

cd apache-gora-0.3

mvn clean package

1、创建项目

mvn archetype:create -DgroupId=org.apdplat.demo.gora -DartifactId=gora-demo

2、增加依赖

vi gora-demo/pom.xml

在<dependencies>标签内增加:

   

<dependency>
              <groupId>org.apache.hadoop</groupId>
              <artifactId>hadoop-core</artifactId>
              <version>1.2.1</version>
       </dependency>
       <dependency>
              <groupId>org.apache.hbase</groupId>
              <artifactId>hbase</artifactId>
              <version>0.94.12</version>
       </dependency>
       <dependency>
              <groupId>org.apache.gora</groupId>
              <artifactId>gora-core</artifactId>
              <version>0.3</version>
                     <exclusions>
                                   <exclusion>
                                                 <groupId>org.apache.hadoop</groupId>
                                                 <artifactId>hadoop-core</artifactId>
                                   </exclusion>
                                   <exclusion>
                                                 <groupId>org.apache.cxf</groupId>
                                                 <artifactId>cxf-rt-frontend-jaxrs</artifactId>
                                   </exclusion>
                     </exclusions>
       </dependency>
       <dependency>
              <groupId>org.apache.gora</groupId>
              <artifactId>gora-hbase</artifactId>
              <version>0.3</version>
                     <exclusions>
                                   <exclusion>
                                                 <groupId>org.apache.hbase</groupId>
                                                 <artifactId>hbase</artifactId>
                                   </exclusion>
                                   <exclusion>
                                                 <groupId>org.apache.hadoop</groupId>
                                                 <artifactId>hadoop-test</artifactId>
                                   </exclusion>
                     </exclusions>
       </dependency>

3、数据建模

mkdir -p gora-demo/src/main/avro

vi gora-demo/src/main/avro/person.json

输入:

   

{
        "type": "record",
        "name": "Person",
        "namespace":"org.apdplat.demo.gora.generated",
        "fields" : [
             {"name":"idcard", "type": "string"},
             {"name":"name", "type": "string"},
             {"name":"age", "type": "string"}
        ]
      }

4、生成JAVA类

bin/gora  goracompiler  gora-demo/src/main/avro/person.json  gora-demo/src/main/java/

5、模型映射

mkdir -p gora-demo/src/main/resources/

vi gora-demo/src/main/resources/gora-hbase-mapping.xml

输入:

      

<gora-orm>
        <table name="Person">
             <familyname="basic"/>
             <familyname="detail"/>
        </table>
        <class table="Person"name="org.apdplat.demo.gora.generated.Person"keyClass="java.lang.String">
         <field name="idcard"family="basic" qualifier="idcard"/>
         <field name="name"family="basic" qualifier="name"/>
         <field name="age"family="detail" qualifier="age"/>
        </class>
      </gora-orm>

6、Gora配置

vi gora-demo/src/main/resources/gora.properties

输入:

      gora.datastore.default=org.apache.gora.hbase.store.HBaseStore

      gora.datastore.autocreateschema=true

7、Hbase配置

vi gora-demo/src/main/resources/hbase-site.xml

输入:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl"href="configuration.xsl"?>
 
<configuration>
 <property>
   <name>hbase.zookeeper.property.clientPort</name>
   <value>2181</value>
 </property>
 <property>
   <name>hbase.zookeeper.quorum</name>
   <value>host001</value>
 </property>
</configuration>

8、编写PersonManager.java和PersonAnalytics.java

vi gora-demo/src/main/java/org/apdplat/demo/gora/PersonManager.java

    输入:

package org.apdplat.demo.gora;
 
 import java.io.BufferedReader;
 import java.io.FileReader;
 import java.io.IOException;
 import java.text.ParseException;
 import org.apache.avro.util.Utf8;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.Result;
 import org.apache.gora.store.DataStore;
 import org.apache.gora.store.DataStoreFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apdplat.demo.gora.generated.Person;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 publicclass PersonManager {
   privatestaticfinal Logger log = LoggerFactory.getLogger(PersonManager.class);    
   private DataStore<String, Person> dataStore;   
   public PersonManager() {
     try{
        init();
     } catch(IOException ex) {
        thrownew RuntimeException(ex);
     }
   }
   privatevoid init() throws IOException {
      Configuration  conf = new Configuration();
     dataStore= DataStoreFactory.getDataStore(String.class, Person.class, conf);
   }
   privatevoid parse(String input) throws IOException,ParseException, Exception {
     log.info("解析文件:" + input);
     BufferedReader reader = new BufferedReader(new FileReader(input));
     longlineCount = 0;
     try{
        String line = reader.readLine();
        do {
          Person person = parseLine(line);
         
          if(person != null) {
            //入库
           storePerson(person.getIdcard().toString(), person);
          }
          lineCount++;
          line = reader.readLine();
        } while(line != null);
       
     } finally{
        reader.close(); 
     }
     log.info("文件解析完毕. 总人数:" + lineCount);
   }
   private Person parseLine(String line) throws ParseException {
            String[] attrs = line.split(" ");
        String idcard = attrs[0];
        String name = attrs[1];
     String age = attrs[2];
     
     Person person = new Person();
     person.setIdcard(new Utf8(idcard));
     person.setName(new Utf8(name));
     person.setAge(new Utf8(age));
     
     return person;
   }
   privatevoid storePerson(String key,Person person) throwsIOException, Exception {
          log.info("保存人员信息: " + person.getIdcard()+"\t"+person.getName()+"\t"+person.getAge());
     dataStore.put(key,person);
   }
   privatevoid get(String key) throws IOException, Exception{
     Person person = dataStore.get(key);
     printPerson(person);
   }
   privatevoid query(String key) throws IOException, Exception{
     Query<String, Person> query = dataStore.newQuery();
     query.setKey(key);
     
     Result<String, Person> result = query.execute();
     
     printResult(result);
   }
   privatevoid query(String startKey,String endKey) throwsIOException, Exception {
     Query<String, Person> query = dataStore.newQuery();
     query.setStartKey(startKey);
     query.setEndKey(endKey);
     
     Result<String, Person> result = query.execute();
     
     printResult(result);
   }
   privatevoid delete(String key) throws Exception {
     dataStore.delete(key);
     dataStore.flush();
     log.info("身份证号码为:" + key + " 的人员信息被删除");
   }
   privatevoid deleteByQuery(StringstartKey, String endKey) throws IOException, Exception {
     Query<String, Person> query = dataStore.newQuery();
     query.setStartKey(startKey);
     query.setEndKey(endKey);
     
     dataStore.deleteByQuery(query);
     log.info("身份证号码从 " + startKey + " 到 " + endKey + " 的人员信息被删除");
   }
   privatevoid printResult(Result<String, Person> result) throws IOException, Exception {     
     while(result.next()){
     String resultKey =result.getKey();
     Person resultPerson =result.get();
       
     System.out.println(resultKey + ":");
     printPerson(resultPerson);
     }
     
     System.out.println("人数:" + result.getOffset());
   }
   privatevoid printPerson(Personperson) {
     if(person== null){
        System.out.println("没有结果");
     } else{
       System.out.println(person.getIdcard()+"\t"+person.getName()+"\t"+person.getAge());
     }
   }
   privatevoid close() throws IOException, Exception{
     if(dataStore != null)
        dataStore.close();
   } 
   privatestaticfinal String USAGE = "PersonManager -parse<input_person_file>\n" +
                                        "          -get <idcard>\n" +
                                        "          -query <idcard>\n" +
                                        "          -query <startIdcard> <endIdcard>\n" +
                                       "          -delete <idcard>\n" +
                                       "          -deleteByQuery <startIdcard> <endIdcard>\n";
   
   publicstaticvoid main(String[] args) throws Exception {
     if(args.length < 2) {
        System.err.println(USAGE);
        System.exit(1);
     }
     
     PersonManager manager = new PersonManager();
     
     if("-parse".equals(args[0])){
        manager.parse(args[1]);
     } elseif("-get".equals(args[0])){
        manager.get(args[1]);
     } elseif("-query".equals(args[0])){
        if(args.length == 2)
          manager.query(args[1]);
        else
          manager.query(args[1], args[2]);
     } elseif("-delete".equals(args[0])){
        manager.delete(args[1]);
     } elseif("-deleteByQuery".equalsIgnoreCase(args[0])){
        manager.deleteByQuery(args[1], args[2]);
     } else{
        System.err.println(USAGE);
        System.exit(1);
     }
     
     manager.close();
   }
}
vi gora-demo/src/main/java/org/apdplat/demo/gora/PersonAnalytics.java
    输入:
 package org.apdplat.demo.gora;
 
 import java.io.IOException;
 
 import org.apache.avro.util.Utf8;
 import org.apache.gora.mapreduce.GoraMapper;
 import org.apache.gora.store.DataStore;
 import org.apache.gora.store.DataStoreFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apdplat.demo.gora.generated.Person;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 publicclass PersonAnalytics extends Configured implements Tool {
    privatestaticfinal Logger log= LoggerFactory
            .getLogger(PersonAnalytics.class);
 
    publicstaticclassPersonAnalyticsMapper extends
            GoraMapper<String,Person, Text, LongWritable> {
        private LongWritable one = new LongWritable(1L);
 
        @Override
        protectedvoid map(String key, Person person, Contextcontext)
                throws IOException,InterruptedException {
            Utf8 age =person.getAge();
            context.write(new Text(age.toString()), one);
        };
    }
 
    publicstaticclassPersonAnalyticsReducer extends
            Reducer<Text,LongWritable, Text, LongWritable> {
        @Override
        protectedvoid reduce(Text key,Iterable<LongWritable> values,
                Context context) throws IOException,InterruptedException {
            long sum = 0L;
            for (LongWritable value :values) {
                sum += value.get();
            }
            context.write(key, new LongWritable(sum));
        };
    }
 
    public Job createJob(DataStore<String,Person> inStore, int numReducer)
            throws IOException {
        Job job = new Job(getConf());
        job.setJobName("Person Analytics");
        log.info("Creating Hadoop Job: " +job.getJobName());
        job.setNumReduceTasks(numReducer);
        job.setJarByClass(getClass());
        GoraMapper.initMapperJob(job,inStore, Text.class,LongWritable.class,
                PersonAnalyticsMapper.class, true);
        job.setReducerClass(PersonAnalyticsReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        TextOutputFormat
                .setOutputPath(job,newPath("person-analytics-output"));
        return job;
    }
 
    @Override
    publicint run(String[] args) throws Exception {
        DataStore<String,Person> inStore;
        Configuration conf = new Configuration();
        if (args.length == 1) {
            String dataStoreClass =args[0];
            inStore =DataStoreFactory.getDataStore(dataStoreClass,
                    String.class, Person.class, conf);
        } else {
            inStore =DataStoreFactory.getDataStore(String.class, Person.class,
                    conf);
        }
        Job job = createJob(inStore,2);
        boolean success = job.waitForCompletion(true);
        inStore.close();
        log.info("PersonAnalytics completed with "
                + (success ? "success": "failure"));
        return success ? 0 : 1;
    }
 
    publicstaticvoidmain(String[] args) throws Exception {
        int ret = ToolRunner.run(new PersonAnalytics(),args);
        System.exit(ret);
    }
}

9、准备数据

        vi gora-demo/src/main/resources/persons.txt

    输入:

      533001198510125839 杨尚川 25

      533001198510125840 杨尚华 22

      533001198510125841 刘德华 55

      533001198510125842 刘亦菲 25

      533001198510125843 蔡卓妍 25

      533001198510125844 林志玲 22

               533001198510125845 李连杰 55

10、在Linux命令行使用maven2编译运行项目

cd gora-demo
mvn clean compile
mvn exec:java -Dexec.mainClass=org.apdplat.demo.gora.PersonManager
mvn exec:java -Dexec.mainClass="org.apdplat.demo.gora.PersonManager" -Dexec.args="-parse src/main/resources/persons.txt"
mvn exec:java -Dexec.mainClass=org.apdplat.demo.gora.PersonAnalytics
cat person-analytics-output/part-r-00000
mvn exec:java -Dexec.mainClass="org.apdplat.demo.gora.PersonManager" -Dexec.args="-get 533001198510125842"
mvn exec:java -Dexec.mainClass="org.apdplat.demo.gora.PersonManager" -Dexec.args="-query 533001198510125844"
mvn exec:java -Dexec.mainClass="org.apdplat.demo.gora.PersonManager" -Dexec.args="-query 533001198510125842 533001198510125845"
mvn exec:java -Dexec.mainClass="org.apdplat.demo.gora.PersonManager" -Dexec.args="-delete 533001198510125840"
mvn exec:java -Dexec.mainClass="org.apdplat.demo.gora.PersonManager" -Dexec.args="-deleteByQuery 533001198510125841 533001198510125842"
mvn exec:java -Dexec.mainClass="org.apdplat.demo.gora.PersonManager" -Dexec.args="-deleteByQuery 533001198510125845 533001198510125846"
mvn exec:java -Dexec.mainClass="org.apdplat.demo.gora.PersonManager" -Dexec.args="-query 533001198510125838 533001198510125848"

11、在windows下使用eclipse编译运行项目

mvn clean package

rm -r target

vi .classpath

删除所有包含path="M2_REPO的行

删除<classpathentry kind="src" path="target/maven-shared-archive-resources"excluding="**/*.java"/>

通过WinSCP把gora-demo传到windows

http://yangshangchuan.iteye.com/blog/1839784下载修改过的hadoop-core-1.2.1.jar替换文件gora-demo\lib\hadoop-core-1.2.1.jar

将gora-demo导入eclipse

将lib下的所有jar加入构建路径

 

12、打包项目并提交Hadoop运行

cd gora-demo

mvn clean package

mkdir job

cp -r lib job/lib

cp -r target/classes/* job

hadoop fs -put persons.txt persons.txt

jar -cvf gora-demo.job *

hadoop jar gora-demo.job org.apdplat.demo.gora.PersonAnalytics 



【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
  KRe60ogUm4le   16天前   29   0   0 javascala
5HAFzPFxGStu
最新推荐 更多

2024-05-03