The Spark Streaming fileStream implementation principle
FileStream Spark is the Source of a Streaming, Basic, for "near real time" analysis of HDFS file system (or compatible with HDFS API) in the specified directory (assuming: dataDirectory) newly written to the file, the dataDirectory files need to satisfy the following constraints:
(1) the file format must be the same, such as: unified to text files;
(2) these files in a directory in the dataDirectory create forms a special: must be atomically "mobile" or "renamed" directory in the dataDirectory;
(3) once the file is "mobile" or "renamed" to catalog the dataDirectory, file can not be changed, for example: append to the data of these files may not be processed.
Called it a "near real time" is based on the constraints (2), all data files must be written to complete, and is "mobile" or "renamed" directory in the dataDirectory, these files can be processed.
Invoke the sample is as follows:
Directory: specifies the directory in which the file to be analysed;
Filter: the user to specify file filter to filter files in the directory,
NewFilesOnly: applicationstartup, directory may have some files in the directory, if newFilesOnly value is true, said ignore these files;If newFilesOnly value is false, the need to analyze these documents;
The conf: the user to specify the Hadoop related configuration properties;
Note: fileStream has two other overloads, go here.
If the analysis file is a text file, Spark provides a convenient method:
FileStream implementation principle is relatively simple: at a fixed time interval (duration) continuously detecting directory (dataDirectory), each time detection time (now - duration, now] new written in file (the file modification time in the recent time interval (now - duration, now]) encapsulated in the RDD to Spark processing.
Spark Streaming has a core components: DStream, fileStream implementation depends on one of the implementation class: FileInputDStream.
And the core logic of FileInputDStream detect file, encapsulation RDD, by the method
to compute (rewritten to DStream compute) implementation,
Compute method of annotation raises an important question: why do we need to maintain a has recently been analysis file list?
Assuming that detect directory for the dataDirectory, detection time interval as the duration, the current time is now, the choose of files need to meet conditions: recent changes need to be at time interval (now] now - duration, and the file may have the following condition:
(1) less than or equal to the time the file was last modified now - duration;
(2) the file was last modified time interval (now - duration, now);
(3) the file was last modified time equals now;
(4) the file was last modified time than now.
Consider the first (3) case, time is equal to the file was last modified now, it has the potential to dataDirectory before detection has been moved to the directory, or after the
completion of a detection or has been moved to directory dataDirectory;File may arise if the latter two, it is "lost" (that is, the file is not processing), the next time the detection time is now + duration, detecting the time range for (now, now + duration], recently modified time is equal to the file is not in the area between now. In order to avoid or reduce file "lost", so the Spark Streaming fileStream allows for the detection of the time range to extend for the "before" (now - n * duration, now], as shown below:
ignore threshold;now - n * duration
current batch time;now
remember window;n * duration
That is to say, every time detection, we will choose the file was last modified time interval (ignore the threshold, the current batch file time], but some files may be in the exploration of the previous has been analyzed, in order to prevent a repeat analysis of the situation, we need to record the time interval (ignore the threshold, the current batch time] (remember Windows) what are the documents have been analyzed.
Let's analysis and compute the process:
1. Looking for new files;
(1) ignore threshold;
This step is to note: there are two important variables initialModTimeIgnoreThreshold and durationToRemember.
Its value is associated with newFilesOnly newFilesOnly said Spark whether Streaming App juststarts the analysis of existing files in directory dataDirectory:
NewFilesOnly = = true: no need to analyze the existing files in the directory dataDirectory, therefore initialModTimeIgnoreThreshold value is set to "the current time," said simply analysis the recent file modification time is greater than the "current time";
NewFilesOnly = = false: need to analyze the existing files in the directory dataDirectory, therefore initialModTimeIgnoreThreshold value is set to 0 (documents of recent modification time will be greater than zero).
SlideDuration: detection of time interval.
MinRememberDurationS: the default value is 60 s, can through property spark. Streaming. FileStream. MinRememberDuration modified.
Through the code above you can see, durationToRemember = slideDuration * math.h ceil (minRememberDurationS. Milliseconds. ToDouble/batchDuration milliseconds). The toInt, durationToRemember is the window, remember we mentioned earlier that is to say, the time interval has been analysis file will be recorded.
Ignore threshold initialModTimeIgnoreThreshold, currentTime - durationToRemember. The maximum number of milliseconds, that also means that even if newFilesOnly value is false, the dataDirectory files will not be in a full analysis, only recently modified time is greater than the currentTime - durationToRemember. Documents will be analysis of milliseconds.
(2) create filter instance;
Filter instance is actually a Hadoop PathFilter instance, depends on the way to isNewFile building, as the name suggests this filter instance is used to select a new file, the new file standard need to meet the following four conditions:
A. the file path matching the user to specify filter instance;
B. documents of recent modification time is greater than the modTimeIgnoreThreshold;
C. document recent modification time is less than or equal to currentTime;
D. documents have not been analysis, namely the file does not appear in the recent list recentlySelectedFiles analysis file.
Need extra explain c here, why the file modification time is not greater than currentTime recently?This is mainly to prevent Spark Streaming application restart occurs when the file is repeated analysis of the situation.
Assumes that the application for termination of the time, restart time is time + 5 * duration, recentlySelectedFiles hold only a duration has recently been analysis of documents,
namely save time window for duration;Application after the restart, the first detection of time + duration, if allowed to file's modification time is greater than the currentTime recently (i.e. the time + duration), in the recent modification time interval (time, + up) file will be analysis, all these files are recorded in the recentlySelectedFiles;Time is the time of the second detection + 2 * duration, because recentlySelectedFiles time window for the duration, can think that its value has been cleared at this time, if allowed to file modification time is greater than the currentTime recently (i.e. the time + 2 * duration), in the recent modification time interval (time + 2 * duration, + up) file will all be analysis, this case can be seen that the recent change in time interval (time + 2 * duration, + up) files are repeated analysis;Moreover detection time is the time duration, time + 4 + 3 * * duration, duration time + 5 * will appear similar to the file has been repeated analysis.To sum up, every time to detect file, file modification time is not greater than currentTime recently.
(3) access to meet the conditions of filter instance file path;
At this point, looking for "new" end of the process of files.
2. Will find the new file is added analysis file list;
Expired data by method of recentlySelectedFiles clearMetadata responsible for clean up.
3. Will find the new file encapsulation for RDD;