object WordCount {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("WordCount").setMaster("local[2]")
//设置batchDuration时间间隔来控制Job生成的频率并且创建Spark Streaming执行的入口
val ssc = new StreamingContext(conf, Seconds(5)) //5秒间隔
val lines = ssc.socketTextStream(
"127.0.0.1",
6666,
StorageLevel.MEMORY_AND_DISK_SER) // 服务器地址,端口,序列化方案
val words = lines.flatMap(_.split(","))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
//真正的调度开始
ssc.start()
ssc.awaitTermination()
}
}
/**
* Create a StreamingContext by providing the configuration necessary for a new SparkContext.
* @param conf a org.apache.spark.SparkConf object specifying Spark parameters
* @param batchDuration the time interval at which streaming data will be divided into batches
*/
def this(conf: SparkConf, batchDuration: Duration) = {
this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
}
StreamingContext类结构如下:
class StreamingContext private[streaming] (
sc_ : SparkContext,
cp_ : Checkpoint,
batchDur_ : Duration
)
数据源获取是通过ssc.socketTextStream(…)来获取的,socketTextStream()会返回一个ReceiverInputDStream对象:
/**
* Create a input stream from TCP source hostname:port. Data is received using
* a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited
* lines.
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
* @param storageLevel Storage level to use for storing the received objects
* (default: StorageLevel.MEMORY_AND_DISK_SER_2)
*/
def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}
/**
* Create a input stream from TCP source hostname:port. Data is received using
* a TCP socket and the receive bytes it interepreted as object using the given
* converter.
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
* @param converter Function to convert the byte stream to objects
* @param storageLevel Storage level to use for storing the received objects
* @tparam T Type of the objects received (after converting bytes to objects)
*/
def socketStream[T: ClassTag](
hostname: String,
port: Int,
converter: (InputStream) => Iterator[T],
storageLevel: StorageLevel
): ReceiverInputDStream[T] = {
new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
}
private[streaming]
class SocketInputDStream[T: ClassTag](
ssc_ : StreamingContext,
host: String,
port: Int,
bytesToObjects: InputStream => Iterator[T],
storageLevel: StorageLevel
) extends ReceiverInputDStream[T](ssc_) {
def getReceiver(): Receiver[T] = {
new SocketReceiver(host, port, bytesToObjects, storageLevel)
}
}
ReceiverInputDStream是InputDStream的子类,在InputDStream中:
abstract class InputDStream[T: ClassTag] (ssc_ : StreamingContext)
extends DStream[T](ssc_) {
private[streaming] var lastValidTime: Time = null
ssc.graph.addInputStream(this)
...
可以看出在实例化时会向ssc.graph添加该InputDStream,ssc.graph即DStreamGraph。
final private[streaming] class DStreamGraph extends Serializable with Logging {
private val inputStreams = new ArrayBuffer[InputDStream[_]]()
private val outputStreams = new ArrayBuffer[DStream[_]]()
DStreamGraph的成员inputStreams就是存放InputDStream的信息。
val words = lines.flatMap(_.split(","))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
flatMap()、map()等是Transformations Operations,每个Operations会返回一个对应的DStream,拿map来看:
/** Return a new DStream by applying a function to all elements of this DStream. */
def map[U: ClassTag](mapFunc: T => U): DStream[U] = ssc.withScope {
new MappedDStream(this, context.sparkContext.clean(mapFunc))
}
这里返回了一个MappedDStream,第一个参数是parent DStream,这就是表明DStream会保存依赖关系。print()是Output Operations操作,会生成一个ForEachDStream并注册到DStreamGraph中:
new ForEachDStream(this,
context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
/**
* Register this streaming as an output stream. This would ensure that RDDs of this
* DStream will be generated.
*/
private[streaming] def register(): DStream[T] = {
ssc.graph.addOutputStream(this)
this
}
在addOutputStream方法中就是将这个ForEachDStream添加到DStreamGraph的成员outputStreams中。
/**
* Start the execution of the streams.
*
* @throws IllegalStateException if the StreamingContext is already stopped.
*/
def start(): Unit = synchronized {
state match {
case INITIALIZED =>
startSite.set(DStream.getCreationSite())
StreamingContext.ACTIVATION_LOCK.synchronized {
StreamingContext.assertNoOtherContextIsActive()
try {
validate()
// Start the streaming scheduler in a new thread, so that thread local properties
// like call sites and job groups can be reset without affecting those of the
// current thread.
ThreadUtils.runInNewThread("streaming-start") {
sparkContext.setCallSite(startSite.get)
sparkContext.clearJobGroup()
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
scheduler.start()
}
state = StreamingContextState.ACTIVE
} catch {
case NonFatal(e) =>
logError("Error starting the context, marking it as stopped", e)
scheduler.stop(false)
state = StreamingContextState.STOPPED
throw e
}
StreamingContext.setActiveContext(this)
}
shutdownHookRef = ShutdownHookManager.addShutdownHook(
StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
// Registering Streaming Metrics at the start of the StreamingContext
assert(env.metricsSystem != null)
env.metricsSystem.registerSource(streamingSource)
uiTab.foreach(_.attach())
logInfo("StreamingContext started")
case ACTIVE =>
logWarning("StreamingContext has already been started")
case STOPPED =>
throw new IllegalStateException("StreamingContext has already been stopped")
}
}
先检查当前StreamingContext的状态,为ACTIVE时不允许新的StreamingContext运行了,因为目前Spark还不支持多个SparkContext同时运行。如果是INITIALIZED状态,则会启动一个streaming-start线程,调用scheduler.start()方法,这里的scheduler定义如下:
private[streaming] val scheduler = new JobScheduler(this)
def start(): Unit = synchronized {
if (eventLoop != null) return // scheduler has already been started
logDebug("Starting JobScheduler")
eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
}
eventLoop.start()
// attach rate controllers of input streams to receive batch completion updates
for {
inputDStream <- ssc.graph.getInputStreams
rateController <- inputDStream.rateController
} ssc.addStreamingListener(rateController)
listenerBus.start(ssc.sparkContext)
receiverTracker = new ReceiverTracker(ssc)
inputInfoTracker = new InputInfoTracker(ssc)
receiverTracker.start()
jobGenerator.start()
logInfo("Started JobScheduler")
}
JobScheduler是Spark Streaming的Job总调度者。在JobScheduler的start()方法中,会首先创建EventLoop[JobSchedulerEvent]类用来处理各类JobSchedulerEvent。
JobScheduler有两个非常重要的成员:JobGenerator和ReceiverTracker。JobScheduler将每个batch的RDD DAG具体生成工作委托给JobGenerator,而将源头输入数据的记录工作委托给ReceiverTracker。