mysql数据库连接池(使用的数据源是阿里巴巴的
德鲁伊
)
<!--导入阿里巴巴的德鲁伊--><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.1.23</version></dependency>
package cn._51doit.spark.utilsimport java.sql.Connectionimport java.util.Propertiesimport com.alibaba.druid.pool.DruidDataSourceFactoryimport javax.sql.DataSourceobject DruidConnectionPool {private val props = new Properties()props.put(\"driverClassName\", \"com.mysql.jdbc.Driver\")props.put(\"url\", \"jdbc:mysql://localhost:3306/doit_16?characterEncoding=UTF-8\")props.put(\"username\", \"root\")props.put(\"password\", \"123456\")private val source: DataSource = DruidDataSourceFactory.createDataSource(props)def getConnection: Connection = {source.getConnection}}
读取偏移量的工具类
(读取MySQL中存放的偏移量)
package cn._51doit.spark.utilsimport java.sql.{Connection, PreparedStatement, ResultSet}import org.apache.kafka.common.TopicPartitionimport scala.collection.mutableobject OffsetUtils {/*** 从MySQL中查询历史偏移量* @param appId* @param groupId* @return Map*/def queryHistoryOffsetFromMySQL(appId: String, groupId: String): Map[TopicPartition, Long] = {val offsetMap = new mutable.HashMap[TopicPartition, Long]()//查询MySQLvar connection: Connection = nullvar statement: PreparedStatement = nullvar resultSet: ResultSet = nulltry {connection = DruidConnectionPool.getConnectionstatement = connection.prepareStatement(\"SELECT topic_partition, offset FROM t_kafka_offset WHERE app_gid = ?\")statement.setString(1, appId + \"_\" + groupId)val resultSet: ResultSet = statement.executeQuery()//变量结果集while (resultSet.next()) {val topic_partition = resultSet.getString(1)val offset = resultSet.getLong(2)val fields = topic_partition.split(\"_\")val topic = fields(0)val partition = fields(1).toIntval topicPartition = new TopicPartition(topic, partition)offsetMap(topicPartition) = offset}} catch {case e: Exception => {throw new RuntimeException(\"查询历史偏移量出现异常\")}} finally {if(resultSet != null) {resultSet.close()}if(statement != null) {statement.close()}if(connection != null) {connection.close()}}offsetMap.toMap}}
SparkStreaming程序
(查询历史偏移量、处理来自kafka的数据、将处理结果及偏移量写入到mysl中)
package cn._51doit.spark.day15import java.sql.{Connection, PreparedStatement}import cn._51doit.spark.utils.{DruidConnectionPool, OffsetUtils}import org.apache.kafka.clients.consumer.ConsumerRecordimport org.apache.kafka.common.TopicPartitionimport org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.SparkConfimport org.apache.spark.rdd.RDDimport org.apache.spark.streaming.dstream.InputDStreamimport org.apache.spark.streaming.kafka010._import org.apache.spark.streaming.{Seconds, StreamingContext}//这是一个聚合类型的运算,将计算好的结果和偏移量在一个事务中都保存到MySQLobject KafkaWordCountStoreDataAndOffsetInMySQL {def main(args: Array[String]): Unit = {val appId = args(0)val groupId = args(1)//实时计算创建StreamingContext,(StreamingContext是对SparkContext的增强包装,里面持有者SparkContext的引用)val conf = new SparkConf().setAppName(appId).setMaster(\"local[*]\")val ssc: StreamingContext = new StreamingContext(conf, Seconds(5))ssc.sparkContext.setLogLevel(\"WARN\")//Kafka相关的参数val kafkaParams = Map[String, Object](\"bootstrap.servers\" -> \"linux01:9092,linux02:9092,linux03:9092\",\"key.deserializer\" -> classOf[StringDeserializer],\"value.deserializer\" -> classOf[StringDeserializer],\"group.id\" -> groupId,\"auto.offset.reset\" -> \"earliest\",\"enable.auto.commit\" -> (false: java.lang.Boolean) //让消费者不用自动提交偏移量)val topics = Array(\"wordcount\")//读取历史偏移量(在Driver端查询历史偏移量)val offset: Map[TopicPartition, Long] = OffsetUtils.queryHistoryOffsetFromMySQL(appId, groupId)//sparkStreaming跟Kafka整合,使用的是官方推荐的直连方式,使用Kafka底层的消费API,效率更高val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc, //传入StreamingContextLocationStrategies.PreferConsistent, //位置策略ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offset) //消费策略(订阅的topic,kafka参数,历史偏移量))kafkaDStream.foreachRDD(rdd => {if(!rdd.isEmpty()) {//获取偏移量信息val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges//实现聚合的功能val lines = rdd.map(_.value())val reduced: RDD[(String, Int)] = lines.flatMap(_.split(\" \")).map((_, 1)).reduceByKey(_ + _)//将聚合后的数据收集到Driver端val results: Array[(String, Int)] = reduced.collect() //只适用聚合类的运算//获取一个数据库连接(适用数据库连接池)var connection: Connection = nullvar pstm1: PreparedStatement = nullvar pstm2: PreparedStatement = nulltry {connection = DruidConnectionPool.getConnection//开启事务connection.setAutoCommit(false)//将计算好的聚合数据写入到MySQL, t_wordcount的表,有两个字段,word 主键,counts longpstm1 = connection.prepareStatement(\"INSERT INTO t_wordcount (word, counts) VALUES (?, ?) ON DUPLICATE KEY UPDATE counts = counts + ?\")//设置参数for (t <- results) {pstm1.setString(1, t._1) //设置单词pstm1.setLong(2, t._2)pstm1.setLong(3, t._2)pstm1.executeUpdate()//pstm1.addBatch()}//pstm1.executeBatch()pstm2 = connection.prepareStatement(\"INSERT INTO t_kafka_offset (app_gid, topic_partition, offset) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE offset = ?\")//将偏移量写入到数据库中 t_kafka_offset : 分析(topic、分区、组ID、结束偏移量)for (range <- offsetRanges) {val topic = range.topicval partition = range.partitionval offset = range.untilOffset//设置参数pstm2.setString(1, appId + \"_\" + groupId)pstm2.setString(2, topic + \"_\" + partition)pstm2.setLong(3, offset)pstm2.setLong(4, offset)//执行updatepstm2.executeUpdate()}//提交事务connection.commit()} catch {case e: Exception => {e.printStackTrace()//回滚connection.rollback()//停掉sparkstreamingssc.stop(true)}} finally {//释放资源if(pstm2 != null) {pstm2.close()}if(pstm1 != null) {pstm1.close()}if(connection != null) {connection.close()}}}})//开启ssc.start()//让程序一直运行,将Driver挂起ssc.awaitTermination()}}