`

hadoop写入hbase数据

阅读更多

 

 

贴下代码,留作备用,

 

   @Override
    public int run(String[] args) throws Exception {
        Configuration hbaseConf = HBaseConfiguration.create();
       /* String whh = hbaseConf.get("hbase.zookeeper.quorum");
        System.out.print(whh);*/
        Config config = new Config();
        config.initJarFile("mr_hbase.properties");
        String numReduceTasksStr = config.getValue("numReduceTasks");
        Integer numReduceTasks = 3;
        if (NumberUtils.isDigits(numReduceTasksStr)) {
            numReduceTasks = Integer.valueOf(numReduceTasksStr);

        }
        String hbaseZookeeperQuorum = config.getValue("hbase.zookeeper.quorum");
        hbaseConf.set("hbase.zookeeper.quorum", hbaseZookeeperQuorum);

        String hbaseZookeeperPropertyClientPort = config.getValue("hbase.zookeeper.property.clientPort");
        hbaseConf.set("hbase.zookeeper.property.clientPort", hbaseZookeeperPropertyClientPort);
        if (args.length > 2) {
            hbaseConf.set("hbase.zookeeper.quorum", args[2]);
        }

        Job job = Job.getInstance(hbaseConf);
        job.setJarByClass(BookKpUnitToHbaseMr.class);
        job.setMapperClass(BookKpUnitToHbaseMr.BookKpUnitToHbaseMapper.class);

        //将第一个路径参数作为输入参数
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        //将第二个HBase表参数作为输出参数
        TableMapReduceUtil.initTableReducerJob(args[1], BookKpUnitToHbaseMr.BookKpUnitToHbaseReducer.class, job);  -----> 设置reducer的时候,使用org.apache.hadoop.hbase.mapreduce类

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(StudentKpInfo.class);

        //设置任务个数
        job.setNumReduceTasks(numReduceTasks);
        return job.waitForCompletion(true) ? 0 : 1;
    }  

 

 

 

 

 

 

 

   public static class toHbaseReducer extends TableReducer<Text, StudentScoreInfo, ImmutableBytesWritable> {   ----> hbase的TableReducer设计只接受rowkey,其余的列簇,列名,列值按写入代码时灵活设置,因此这个类只有ImmutableBytesWritable
        @Override
        protected void reduce(Text key, Iterable<StudentScoreInfo> values, Context context) throws IOException, InterruptedException {
            try {
                String rowkey = key.toString();
                Put put = new Put(rowkey.getBytes());
                for (StudentScoreInfo studentScoreInfo : values) {
                    put.addColumn(Bytes.toBytes("es"), Bytes.toBytes(studentScoreInfo.getStudentId()), Bytes.toBytes(studentScoreInfo.getStudentScoreValue()));  // 写入列,参数1分别为 es表示列簇    参数2表示列名  参数3表示列值 
                }
                context.write(new ImmutableBytesWritable(Bytes.toBytes(rowkey)), put);  // 将rowkey和这一列写入hbase
            } catch (Exception e) {
                logger.error("reduce error: ", e);
            }
        }
    }

 

分享到:
评论

相关推荐

    hadoop数据加解密

    1、随机生成大文本文件(以行方式存储),文件存储在HDFS中,并将文件信息写入HBase中。 2、选择AES加密算法对生成的文件进行加密操作,秘钥长度为128位,加密后的文件存储HDFS中,秘钥写入HBase 3、从HBase中读取...

    Hadoop整合Hbase案列详解

    要求:读取HBase当中user这张表的f1:name、f1:age数据,将数据写入到另外一张user2表的f1列族里面去==*** 第一步:创建表 **注意:**两张表的列族一定要相同 /** create 'user','f1' put 'user','rk001','f1:name...

    Hadoop加解密-完整项目.zip

    1、随机生成大文本文件(以行方式存储),文件存储HDFS中,并将文件信息写入HBASE 2、选择适用的加密算法(SM/AES)对生成的文件进行加密操作,密钥长度128位,加密后的文件存储HDFS中,密钥写入HBASE 3、从HBASE中...

    Hadoop权威指南 第二版(中文版)

     本书从Hadoop的缘起开始,由浅入深,结合理论和实践,全方位地介绍Hadoop这一高性能处理海量数据集的理想工具。全书共16章,3个附录,涉及的主题包括:Haddoop简介;MapReduce简介;Hadoop分布式文件系统;Hadoop...

    Hadoop权威指南(中文版)2015上传.rar

    使用Hadoop分析数据 map阶段和reduce阶段 横向扩展 合并函数 运行一个分布式的MapReduce作业 Hadoop的Streaming Ruby版本 Python版本 Hadoop Pipes 编译运行 第3章 Hadoop分布式文件系统 HDFS的设计 HDFS的概念 数据...

    7-分布式数据库HBase.ppt

    因此,业界出现了一类面向半结构化数据存储和处理的高可扩展、低写入/查询延迟的系统,例如,键值数据库、文档数据库和列族数据库(如BigTable和HBase等) HBase已经成功应用于互联网服务领域和传统行业的众多在线式...

    hbase-debugging:HBase问题的各种测试

    经过几次尝试后,它将成功(对hbase上的相同数据进行的所有尝试均不会得到其他写入)。 查看日志,这似乎发生在客户端内存不足且发生了几个完整GC的情况下。 然后,指示从本地客户端缓存中删除区域位置的消息: ...

    Hadoop的数据管理

    前面重点介绍了Hadoop及其体系结构和计算模型MapReduce,现在开始介绍Hadoop的数据管理,主要包括Hadoop的分布式文件系统HDFS、分布式数据库HBase和数据仓库工具Hive的数据管理。HDFS是分布式计算的存储基石,Hadoop...

    spark-examples_2.10-1.6.4-SNAPSHOT.jar

    解决Spark与Hbase版本升级后Pyspark写入,报错找不到StringToImmutableBytesWritableConverte类和找不到:org.apache.hadoop.hbase.client.Put类中的add方法的问题

    论文研究-海量样本数据集中小文件的存取优化研究.pdf

    结合行键优化策略将文件索引存储在HBase数据表中;搭建基于Ehcache缓存框架的预取机制。实验结果表明,该方案降低了主节点的内存消耗,提高了文件的读取效率,实现了对海量样本数据集中小文件的高效存取。

    hbase-exporter:HBase Prometheus导出器

    例如“过渡中的过时区域” 解析“ hbase hbck”命令的输出以检查HBase中的不一致情况将Hbase标记为不健康要求满足以下条件之一至少有一个陈旧的区域正在过渡'hbase hbck'命令显示HBase不一致对预定义表的写入不成功...

    基于spark streaming和kafka,hbase的日志统计分析系统.zip

    两种东西,其一是IBM微软数据产品为代表的,其二是Hadoop+Hive+Apache Hive数据仓库软件有助于使用SQL读取,写入和管理驻留在分布式存储中的大型数据集。 可以将结构投影到已经存储的数据上。 提供了命令行工具和...

    基于spark+flume+kafka+hbase的实时日志处理分析系统.zip

    两种东西,其一是IBM微软数据产品为代表的,其二是Hadoop+Hive+Apache Hive数据仓库软件有助于使用SQL读取,写入和管理驻留在分布式存储中的大型数据集。 可以将结构投影到已经存储的数据上。 提供了命令行工具和...

    基于云平台的分布式新能源监控数据分析方法研究

    完成了基于HBase数据库的并行数据采集系统的模块设计、基于SCAD系统的结构化数据存储模块设计、基于MapReduce方法的分布式数据处理方案设计,解决了数据量剧增导致的存储空间不足问题以及各级单位数据交互难的问题,...

    XX能源云数据平台建设项目_投标书_技术部分_V1.0.doc

     支持数据采集写入到HDFS、HBase、Hive等大数据库中。  支持准实时数据的采集和处理。 数据存储处理:  支持原来BW数据仓库的数据逐层转换处理。  支持Hadoop大数据库中进行分布式海量数据的并行处理。  ...

    基于hbase + spark 实现常用推荐算法(主要用于精准广告投放和推荐系统).zip

    两种东西,其一是IBM微软数据产品为代表的,其二是Hadoop+Hive+Apache Hive数据仓库软件有助于使用SQL读取,写入和管理驻留在分布式存储中的大型数据集。 可以将结构投影到已经存储的数据上。 提供了命令行工具和...

    key-value-store-indexing:HBase上的二级索引

    它当前支持全局索引,并且适用于类似HBase的NoSQL系统,在该系统中,通过LSM树结构[]优化了写入性能。 DELI代表“延迟的轻量级索引”。 它的独特设计(考虑到HBase和NoSQL系统还有其他二级索引)是: 严格遵循日志...

    Deferred-Lightweight-Indexing-for-Log-Structured-Key-Value-Stores:HBase http上的二级索引

    它当前支持全局索引,并且适用于类似HBase的NoSQL系统,在该系统中,通过LSM树结构[]优化了写入性能。 DELI代表“延迟的轻量级索引”。 它的独特设计(考虑到HBase和NoSQL系统还有其他二级索引)是: 严格遵循日志...

    Spark学习笔记

    spark 可以很容易和yarn结合,直接调用HDFS、Hbase上面的数据,和hadoop结合。配置很容易。 spark发展迅猛,框架比hadoop更加灵活实用。减少了延时处理,提高性能效率实用灵活性。也可以与hadoop切实相互结合。 ...

Global site tag (gtag.js) - Google Analytics