AI智能
改变未来

kafka上传至hbase(使用main方法)

java基本main写法:将数据从kafka->hbase

  • 1、新建hbase目录
  • 编写EventAttendeeshb java类
  • 编写UserFriendshb java类
  • 2、运行
  • 3、进入xshell 进行查看
    • 进hbase 查看表空间和表
    • 然后hbase查看

    1、新建hbase目录

    编写EventAttendeeshb java类

    package nj.zb.hbase;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.TableName;import org.apache.hadoop.hbase.client.Connection;import org.apache.hadoop.hbase.client.ConnectionFactory;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.client.Table;import org.apache.hadoop.hbase.util.Bytes;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.serialization.StringDeserializer;import java.io.IOException;import java.util.ArrayList;import java.util.Collections;import java.util.List;import java.util.Properties;/*** @author: 03-CJH* @date:2020/5/28* @desc:*/public class EventAttendeeshb {public static void main(String[] args){Properties prop = new Properties();prop.put(ConsumerConfig.GROUP_ID_CONFIG,\"eventattendees1\");prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,\"192.168.48.141:9092\");prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,\"30000\");prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,\"false\");prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,\"earliest\");prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,\"1000\");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);consumer.subscribe(Collections.singleton(\"event_attendees\"));//hbaseConfiguration conf = HBaseConfiguration.create();conf.set(\"hbase.zookeeper.quorum\",\"192.168.48.141\");conf.set(\"hbase.zookeeper.property.clientPort\",\"2181\");conf.set(\"hbase.rootdir\",\"hdfs://192.168.48.141:9000/hbase\");try {Connection connection = ConnectionFactory.createConnection(conf);Table table = connection.getTable(TableName.valueOf(\"events_db:event_attendees\"));while (true){List<Put> datas = new ArrayList<>();ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.println(record.value());String[] info = record.value().toString().split(\",\");Put put = new Put(Bytes.toBytes((info[0] + info[1] + info[2]).hashCode()));put.addColumn(\"euat\".getBytes(),\"eventid\".getBytes(),info[0].getBytes());put.addColumn(\"euat\".getBytes(),\"friendid\".getBytes(),info[1].getBytes());put.addColumn(\"euat\".getBytes(),\"status\".getBytes(),info[2].getBytes());datas.add(put);}table.put(datas);}} catch (IOException e) {e.printStackTrace();}}}

    编写UserFriendshb java类

    package nj.zb.hbase;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.TableName;import org.apache.hadoop.hbase.client.Connection;import org.apache.hadoop.hbase.client.ConnectionFactory;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.client.Table;import org.apache.hadoop.hbase.util.Bytes;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.serialization.StringDeserializer;import org.omg.CORBA.CODESET_INCOMPATIBLE;import java.io.IOException;import java.util.ArrayList;import java.util.Collections;import java.util.List;import java.util.Properties;/*** @author: 03-CJH* @date:2020/5/28* @desc:*/public class UserFriendshb {public static void main(String[] args){Properties prop = new Properties();prop.put(ConsumerConfig.GROUP_ID_CONFIG,\"userfriends1\");prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,\"192.168.48.141:9092\");prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,\"30000\");prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,\"false\");prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,\"earliest\");prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,\"1000\");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);consumer.subscribe(Collections.singleton(\"user_friends\"));Configuration conf = HBaseConfiguration.create();conf.set(\"hbase.zookeeper.quorum\",\"192.168.48.141\");conf.set(\"hbase.zookeeper.property.clientPort\",\"2181\");conf.set(\"hbase.rootdir\",\"hdfs://192.168.48.141:9000/hbase\");try {Connection connection = ConnectionFactory.createConnection(conf);Table table = connection.getTable(TableName.valueOf(\"events_db:user_friends\"));while (true){ConsumerRecords<String, String> records = consumer.poll(100);List<Put> datas = new ArrayList<>();for (ConsumerRecord<String, String> record : records) {System.out.println(record.value().toString());String[] info = record.value().toString().split(\",\");Put put = new Put(Bytes.toBytes((info[0]+info[1]).hashCode()));put.addColumn(\"uf\".getBytes(),\"userid\".getBytes(),info[0].getBytes());put.addColumn(\"uf\".getBytes(),\"friendid\".getBytes(),info[1].getBytes());datas.add(put);}table.put(datas);}} catch (IOException e) {e.printStackTrace();}}}

    2、运行


    另一个运行同样如此

    3、进入xshell 进行查看

    ps:需要先启动hadoop,hbase,zookeeper,kafka服务

    # hadoopstart-all.sh# zookeeperzkServer.sh start# hbasestart-hbase.sh# kafka (-daemon为后台启动)kafka-server-start.sh -daemon /opt/bigdata/kafka211/config/server.properties

    进hbase 查看表空间和表

    ps:当然首先你要确保hbase里面有这个表空间和表

    # 进hbase页面[root@cjh1 ~]# hbase shellhbase> list_namespace# 如若没有则创建hbase> create_namespace \'events_db\'hbase> create \'events_db:user_friends\',\'uf\'

    然后hbase查看

    ps:因为用hbase shell的count命令执行的非常慢,本人推荐用下面的命令:

    [root@cjh1 ~]# hbase org.apache.hadoop.hbase.mapreduce.RowCounter \'events_db:user_friends\'

    ps:至此,使用main方法进行kafka上传至hbase完成!

    ps:望多多支持,后续文章还在持续更新中…

    赞(0) 打赏
    未经允许不得转载:爱站程序员基地 » kafka上传至hbase(使用main方法)