java基本main写法:将数据从kafka->hbase
- 1、新建hbase目录
- 编写EventAttendeeshb java类
- 编写UserFriendshb java类
- 进hbase 查看表空间和表
- 然后hbase查看
1、新建hbase目录
编写EventAttendeeshb java类
package nj.zb.hbase;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.TableName;import org.apache.hadoop.hbase.client.Connection;import org.apache.hadoop.hbase.client.ConnectionFactory;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.client.Table;import org.apache.hadoop.hbase.util.Bytes;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.serialization.StringDeserializer;import java.io.IOException;import java.util.ArrayList;import java.util.Collections;import java.util.List;import java.util.Properties;/*** @author: 03-CJH* @date:2020/5/28* @desc:*/public class EventAttendeeshb {public static void main(String[] args){Properties prop = new Properties();prop.put(ConsumerConfig.GROUP_ID_CONFIG,\"eventattendees1\");prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,\"192.168.48.141:9092\");prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,\"30000\");prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,\"false\");prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,\"earliest\");prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,\"1000\");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);consumer.subscribe(Collections.singleton(\"event_attendees\"));//hbaseConfiguration conf = HBaseConfiguration.create();conf.set(\"hbase.zookeeper.quorum\",\"192.168.48.141\");conf.set(\"hbase.zookeeper.property.clientPort\",\"2181\");conf.set(\"hbase.rootdir\",\"hdfs://192.168.48.141:9000/hbase\");try {Connection connection = ConnectionFactory.createConnection(conf);Table table = connection.getTable(TableName.valueOf(\"events_db:event_attendees\"));while (true){List<Put> datas = new ArrayList<>();ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.println(record.value());String[] info = record.value().toString().split(\",\");Put put = new Put(Bytes.toBytes((info[0] + info[1] + info[2]).hashCode()));put.addColumn(\"euat\".getBytes(),\"eventid\".getBytes(),info[0].getBytes());put.addColumn(\"euat\".getBytes(),\"friendid\".getBytes(),info[1].getBytes());put.addColumn(\"euat\".getBytes(),\"status\".getBytes(),info[2].getBytes());datas.add(put);}table.put(datas);}} catch (IOException e) {e.printStackTrace();}}}
编写UserFriendshb java类
package nj.zb.hbase;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.TableName;import org.apache.hadoop.hbase.client.Connection;import org.apache.hadoop.hbase.client.ConnectionFactory;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.client.Table;import org.apache.hadoop.hbase.util.Bytes;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.serialization.StringDeserializer;import org.omg.CORBA.CODESET_INCOMPATIBLE;import java.io.IOException;import java.util.ArrayList;import java.util.Collections;import java.util.List;import java.util.Properties;/*** @author: 03-CJH* @date:2020/5/28* @desc:*/public class UserFriendshb {public static void main(String[] args){Properties prop = new Properties();prop.put(ConsumerConfig.GROUP_ID_CONFIG,\"userfriends1\");prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,\"192.168.48.141:9092\");prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,\"30000\");prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,\"false\");prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,\"earliest\");prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,\"1000\");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);consumer.subscribe(Collections.singleton(\"user_friends\"));Configuration conf = HBaseConfiguration.create();conf.set(\"hbase.zookeeper.quorum\",\"192.168.48.141\");conf.set(\"hbase.zookeeper.property.clientPort\",\"2181\");conf.set(\"hbase.rootdir\",\"hdfs://192.168.48.141:9000/hbase\");try {Connection connection = ConnectionFactory.createConnection(conf);Table table = connection.getTable(TableName.valueOf(\"events_db:user_friends\"));while (true){ConsumerRecords<String, String> records = consumer.poll(100);List<Put> datas = new ArrayList<>();for (ConsumerRecord<String, String> record : records) {System.out.println(record.value().toString());String[] info = record.value().toString().split(\",\");Put put = new Put(Bytes.toBytes((info[0]+info[1]).hashCode()));put.addColumn(\"uf\".getBytes(),\"userid\".getBytes(),info[0].getBytes());put.addColumn(\"uf\".getBytes(),\"friendid\".getBytes(),info[1].getBytes());datas.add(put);}table.put(datas);}} catch (IOException e) {e.printStackTrace();}}}
2、运行
另一个运行同样如此
3、进入xshell 进行查看
ps:需要先启动hadoop,hbase,zookeeper,kafka服务
# hadoopstart-all.sh# zookeeperzkServer.sh start# hbasestart-hbase.sh# kafka (-daemon为后台启动)kafka-server-start.sh -daemon /opt/bigdata/kafka211/config/server.properties
进hbase 查看表空间和表
ps:当然首先你要确保hbase里面有这个表空间和表
# 进hbase页面[root@cjh1 ~]# hbase shellhbase> list_namespace# 如若没有则创建hbase> create_namespace \'events_db\'hbase> create \'events_db:user_friends\',\'uf\'
然后hbase查看
ps:因为用hbase shell的count命令执行的非常慢,本人推荐用下面的命令:
[root@cjh1 ~]# hbase org.apache.hadoop.hbase.mapreduce.RowCounter \'events_db:user_friends\'
ps:至此,使用main方法进行kafka上传至hbase完成!
ps:望多多支持,后续文章还在持续更新中…