您的当前位置:首页正文

SparkStreaming的WordCount示例及源码分析(一)

2024-11-13 来源:个人技术集锦

一. 代码示例

object WordCount {
    def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("WordCount").setMaster("local[2]")
        
        //设置batchDuration时间间隔来控制Job生成的频率并且创建Spark Streaming执行的入口
        val ssc = new StreamingContext(conf, Seconds(5)) //5秒间隔
        val lines = ssc.socketTextStream(
            "127.0.0.1",
            6666,
            StorageLevel.MEMORY_AND_DISK_SER) // 服务器地址,端口,序列化方案
        val words = lines.flatMap(_.split(","))
        val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
        wordCounts.print()
        
        //真正的调度开始
        ssc.start()
        ssc.awaitTermination()
    }
}

二. 源码分析

       创建StreamingContext:
  /**
   * Create a StreamingContext by providing the configuration necessary for a new SparkContext.
   * @param conf a org.apache.spark.SparkConf object specifying Spark parameters
   * @param batchDuration the time interval at which streaming data will be divided into batches
   */
  def this(conf: SparkConf, batchDuration: Duration) = {
    this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
  }
       StreamingContext类结构如下:
class StreamingContext private[streaming] (
    sc_ : SparkContext,
    cp_ : Checkpoint,
    batchDur_ : Duration
  )
       数据源获取是通过ssc.socketTextStream(…)来获取的,socketTextStream()会返回一个ReceiverInputDStream对象:
/**
 * Create a input stream from TCP source hostname:port. Data is received using
 * a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited
 * lines.
 * @param hostname      Hostname to connect to for receiving data
 * @param port          Port to connect to for receiving data
 * @param storageLevel  Storage level to use for storing the received objects
 *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
 */
def socketTextStream(
    hostname: String,
    port: Int,
    storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
  ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
  socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}
/**
 * Create a input stream from TCP source hostname:port. Data is received using
 * a TCP socket and the receive bytes it interepreted as object using the given
 * converter.
 * @param hostname      Hostname to connect to for receiving data
 * @param port          Port to connect to for receiving data
 * @param converter     Function to convert the byte stream to objects
 * @param storageLevel  Storage level to use for storing the received objects
 * @tparam T            Type of the objects received (after converting bytes to objects)
 */
def socketStream[T: ClassTag](
    hostname: String,
    port: Int,
    converter: (InputStream) => Iterator[T],
    storageLevel: StorageLevel
  ): ReceiverInputDStream[T] = {
  new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
}
private[streaming]
class SocketInputDStream[T: ClassTag](
    ssc_ : StreamingContext,
    host: String,
    port: Int,
    bytesToObjects: InputStream => Iterator[T],
    storageLevel: StorageLevel
  ) extends ReceiverInputDStream[T](ssc_) {

  def getReceiver(): Receiver[T] = {
    new SocketReceiver(host, port, bytesToObjects, storageLevel)
  }
}
       ReceiverInputDStream是InputDStream的子类,在InputDStream中:
abstract class InputDStream[T: ClassTag] (ssc_ : StreamingContext)
  extends DStream[T](ssc_) {


  private[streaming] var lastValidTime: Time = null


  ssc.graph.addInputStream(this)
  ...
       可以看出在实例化时会向ssc.graph添加该InputDStream,ssc.graph即DStreamGraph。
final private[streaming] class DStreamGraph extends Serializable with Logging {


  private val inputStreams = new ArrayBuffer[InputDStream[_]]()
  private val outputStreams = new ArrayBuffer[DStream[_]]()
       DStreamGraph的成员inputStreams就是存放InputDStream的信息。
       接下来是一系列DStream操作:
val words = lines.flatMap(_.split(","))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
       flatMap()、map()等是Transformations Operations,每个Operations会返回一个对应的DStream,拿map来看:
  /** Return a new DStream by applying a function to all elements of this DStream. */
  def map[U: ClassTag](mapFunc: T => U): DStream[U] = ssc.withScope {
    new MappedDStream(this, context.sparkContext.clean(mapFunc))
  }
       这里返回了一个MappedDStream,第一个参数是parent DStream,这就是表明DStream会保存依赖关系。print()是Output Operations操作,会生成一个ForEachDStream并注册到DStreamGraph中:
    new ForEachDStream(this,
      context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
  /**
   * Register this streaming as an output stream. This would ensure that RDDs of this
   * DStream will be generated.
   */
  private[streaming] def register(): DStream[T] = {
    ssc.graph.addOutputStream(this)
    this
  }
       在addOutputStream方法中就是将这个ForEachDStream添加到DStreamGraph的成员outputStreams中。

       之后调用ssc.start(),在StreamingContext中:
/**
 * Start the execution of the streams.
 *
 * @throws IllegalStateException if the StreamingContext is already stopped.
 */
def start(): Unit = synchronized {
  state match {
    case INITIALIZED =>
      startSite.set(DStream.getCreationSite())
      StreamingContext.ACTIVATION_LOCK.synchronized {
        StreamingContext.assertNoOtherContextIsActive()
        try {
          validate()


          // Start the streaming scheduler in a new thread, so that thread local properties
          // like call sites and job groups can be reset without affecting those of the
          // current thread.
          ThreadUtils.runInNewThread("streaming-start") {
            sparkContext.setCallSite(startSite.get)
            sparkContext.clearJobGroup()
            sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
            scheduler.start()
          }
          state = StreamingContextState.ACTIVE
        } catch {
          case NonFatal(e) =>
            logError("Error starting the context, marking it as stopped", e)
            scheduler.stop(false)
            state = StreamingContextState.STOPPED
            throw e
        }
        StreamingContext.setActiveContext(this)
      }
      shutdownHookRef = ShutdownHookManager.addShutdownHook(
        StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
      // Registering Streaming Metrics at the start of the StreamingContext
      assert(env.metricsSystem != null)
      env.metricsSystem.registerSource(streamingSource)
      uiTab.foreach(_.attach())
      logInfo("StreamingContext started")
    case ACTIVE =>
      logWarning("StreamingContext has already been started")
    case STOPPED =>
      throw new IllegalStateException("StreamingContext has already been stopped")
  }
}
       先检查当前StreamingContext的状态,为ACTIVE时不允许新的StreamingContext运行了,因为目前Spark还不支持多个SparkContext同时运行。如果是INITIALIZED状态,则会启动一个streaming-start线程,调用scheduler.start()方法,这里的scheduler定义如下:
private[streaming] val scheduler = new JobScheduler(this)

       在Spark Streaming里,总体负责动态作业调度的具体类是JobScheduler, treamingContext实例拥有JobScheduler实例,在ssc.start() 开始运行时,会调用JobScheduler实例的start()方法。
  def start(): Unit = synchronized {
    if (eventLoop != null) return // scheduler has already been started


    logDebug("Starting JobScheduler")
    eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
      override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)


      override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
    }
    eventLoop.start()


    // attach rate controllers of input streams to receive batch completion updates
    for {
      inputDStream <- ssc.graph.getInputStreams
      rateController <- inputDStream.rateController
    } ssc.addStreamingListener(rateController)


    listenerBus.start(ssc.sparkContext)
    receiverTracker = new ReceiverTracker(ssc)
    inputInfoTracker = new InputInfoTracker(ssc)
    receiverTracker.start()
    jobGenerator.start()
    logInfo("Started JobScheduler")
  }
       JobScheduler是Spark Streaming的Job总调度者。在JobScheduler的start()方法中,会首先创建EventLoop[JobSchedulerEvent]类用来处理各类JobSchedulerEvent。

       JobScheduler有两个非常重要的成员:JobGenerator和ReceiverTracker。JobScheduler将每个batch的RDD DAG具体生成工作委托给JobGenerator,而将源头输入数据的记录工作委托给ReceiverTracker。





Top