In this paper, we present the Stork data scheduler as a solution for mitigating the data bottleneck in e-Science and data-intensive scientific discovery. Stork focuses on planning, scheduling, monitoring and management of data placement tasks and application-level end-to-end optimization of networked inputs/outputs for petascale distributed e-Science applications. Unlike existing approaches, Stork treats data resources and the tasks related to data access and movement as first-class entities just like computational resources and compute tasks, and not simply the side-effect of computation. Stork provides unique features such as aggregation of data transfer jobs considering their source and destination addresses, and an application-level throughput estimation and optimization service. We describe how these two features are implemented in Stork and their effects on end-to-end data transfer performance.
Scientific applications generate increasingly large amounts of data, often referred as the ‘data deluge’ , which necessitates collaboration and sharing among the national and international research institutions. Simply purchasing high-capacity, high-performance storage systems and adding them to the existing infrastructure of the collaborating institutions does not solve the underlying and highly challenging data-handling problem. Scientists are often forced to spend a great deal of time and energy on solving basic data-handling issues, such as the physical location of data, how to access it and/or how to move it to visualization and/or compute resources for further analysis. The systems managing these resources must provide robust scheduling and allocation of network and storage resources, as well as optimization of end-to-end data transfers over wide area networks (WANs).
According to the Strategic Plan for the US Climate Change Science Programme , one of the main objectives of the future research programmes should be ‘enhancing the data management infrastructure’, since ‘The users should be able to focus their attention on the information content of the data, rather than how to discover, access, and use it’. [2, pp. 143–150]. This statement by the CCSP summarizes the goal of many cyberinfrastructure efforts initiated by the US Department of Energy (DOE), National Science Foundation (NSF) and other federal agencies, as well as the research direction of several leading academic institutions. This is also the main motivation for the work presented in this paper.
Traditional distributed computing systems closely couple data handling and computation. They consider data resources as second-class entities, and access to data as a side-effect of computation. Data placement (i.e. access, retrieval and/or movement of data) either is embedded in the computation and causes the computation to delay or is performed by simple scripts that do not have the same privileges as compute jobs. The inadequacy of traditional distributed computing systems in dealing with complex data-handling problems in our new data-rich world has motivated a new paradigm called data-aware distributed computing .
In previous work, we have introduced the concept that the data placement activities in a distributed computing environment need to be first-class entities just like computational jobs, and presented the first batch scheduler specialized in data placement and data movement: Stork . This scheduler implements techniques specific to queuing, scheduling and optimization of data placement jobs, and provides a level of abstraction between the user applications and the underlying data transfer and storage resources. Stork is considered one of the very first examples of ‘data-aware scheduling’ and has been very actively used in many e-Science application areas including coastal hazard prediction and storm surge modelling ; oil flow and reservoir uncertainty analysis ; digital sky imaging ; educational video processing and behavioural assessment ; numerical relativity and black hole collisions ; and multi-scale computational fluid dynamics .
Our previous work in this area has also been acknowledged by the strategic reports of federal agencies. The DOE Office of Science report on data management challenges defines data movement and efficient access to data as two key foundations of scientific research . The DOE report says: ‘In the same way that the load register instruction is the most basic operation provided by a CPU, so is the placement of data on a storage device. … It is therefore essential that at all levels data placement tasks be treated in the same way computing tasks are treated’, and refers to our previous work . The same report also states that ‘Although many mechanisms exist for data transfer, research and development is still required to create schedulers and planners for storage space allocation and the transfer of data’ [11, pp. 55–56].
In this paper, we elaborate on how a data scheduler like Stork can help to mitigate the large-scale data management problem for e-Science applications. Section 2 presents the related work in this area, and §3 gives background information about the Stork data scheduler. In the following sections, we describe two unique features of Stork: aggregation of data transfer jobs considering their source and destination addresses (§4) and the application-level throughput estimation and optimization service (§5). We conclude the paper in §6.
Several previous studies address data management for large-scale applications [12–20]. However, scheduling of data storage and networking resources and optimization of data transfer tasks has been an open problem.
In an effort to achieve reliable and efficient data placement, high-level data management tools such as the reliable file transfer (RFT) service , the lightweight data replicator (LDR)  and the data replication service (DRS)  were developed. The main motivation for these tools was to enable byte streams to be transferred in a reliable manner, by handling possible failures such as dropped connections, machine reboots and temporary network outages automatically via retrying. Most of these tools are built on top of the Grid file transfer protocol (GridFTP) [14,15], which is a secure and reliable data transfer protocol especially developed for high-bandwidth WANs.
Beck et al.  introduced logistical networking that performs global scheduling and optimization of data movement, storage and computation based on a model that takes into account all the network’s underlying physical resources. Systems such as the storage resource broker (SRB) , the integrated rule oriented data system (iRODS)  and the storage resource manager (SRM)  were developed to provide a uniform interface for connecting to heterogeneous data resources and accessing replicated datasets.
Thain et al.  introduced the batch-aware distributed file system (BAD-FS), which was followed by a modified data-driven batch scheduling system . Their goal was to achieve data-driven batch scheduling by exporting explicit control of storage decisions from the distributed file system to the batch scheduler. Using some simple data-driven scheduling techniques, they have demonstrated that the new data-driven system can achieve better throughput both over current distributed file systems such as the Andrew file system as well as over traditional central processing unit (CPU)-centric batch scheduling techniques, which are using remote inputs/outputs (I/O).
According to Stockinger  and Stockinger et al. , the entire resource selection problem requires detailed cost models with respect to data transfer. A cost model for data-intensive applications is presented in Stockinger et al.  where theoretical models for data-intensive job scheduling are discussed. In that work, a cost model is created that can determine whether it is more efficient to transfer the data to a job or vice versa. The metric for measuring efficiency is the effective time seen by the client application. The model includes all important factors in a distributed data Grid and takes various storage and access latencies into account to determine optimal data access. More general performance engineering approaches are discussed in Stockinger et al. . In that work, they analyse a typical Grid system and point out performance analysis aspects in order to improve the overall job execution time of the system. Their focus is on the performance issues regarding data and replica management.
The studies that try to find the optimal number of streams for data scheduling are limited and they are mostly based on approximate theoretical models [33–36]. They all have specific constraints and assumptions. Also the correctness of the model is proved with simulation results mostly. Hacker et al.  claim that the total number of streams behaves like one giant stream that transfers in the capacity of the total of each stream’s achievable throughput. However, this model only works for uncongested networks. Thus it is not able to predict when the network will be congested. Another study  declares the same theory but develops a protocol which at the same time provides fairness. Lu et al.  model the bandwidth of multiple streams as a partial second-order polynomial equation and need two different throughput measurements of different stream numbers to predict the others. In another model, the total throughput always shows the same characteristics  depending on the capacity of the connection as the number of streams increases and three streams are sufficient to get a 90 per cent utilization. None of the existing studies are able to accurately predict the optimal number of parallel streams for best data throughput in a congested network.
3. Stork data scheduler
The Stork data scheduler [24,37,38] implements techniques specific to queuing, scheduling and optimization of data placement jobs, provides high reliability in data transfers and creates a level of abstraction between the user applications and the underlying data transfer and storage resources (including file transfer protocol (FTP), hypertext transfer protocol (HTTP), GridFTP, SRM, SRB and iRODS) via a modular, uniform interface. Stork is considered one of the very first examples of ‘data-aware scheduling’ and has been very actively used in many e-Science application areas, including: coastal hazard prediction, reservoir uncertainty analysis, digital sky imaging, educational video processing, numerical relativity and multi-scale computational fluid dynamics resulting in breakthrough research [5–10,38–40].
Using Stork, the users can transfer very large datasets via a single command. The checkpointing, error recovery and retry mechanisms ensure the completion of the tasks even in the case of unexpected failures. Multi-protocol support makes Stork a very powerful data transfer tool. This feature allows Stork not only to access and manage different data storage systems, but also to be used as a fall-back mechanism when one of the protocols fails in transferring the desired data. Optimizations such as concurrent transfers, parallel streaming, request aggregation and data fusion provide enhanced performance compared with other data transfer tools. The Stork data scheduler can also interact with higher level planners and workflow managers for the coordination of compute and data tasks. This allows the users to schedule both CPU resources and storage resources asynchronously as two parallel universes, overlapping computation and I/O.
Our initial end-to-end implementation using Stork consists of two parallel universes: a data subsystem and a compute subsystem. These two subsystems are complementary, the first specializing in data management and scheduling and the latter specializing in compute management and scheduling. The orchestration of these two parallel subsystems is performed by the upper layer workflow planning and execution components. In cases where multiple workflows need to be executed on the same system, users may want to prioritize among multiple workflows or make other scheduling decisions. This is handled by the workflow scheduler at the highest level. An example for such a case would be hurricanes of different urgency levels arriving at the same time.
When integrating data placement into end-to-end workflow planning and management systems, we make use of similarities to instruction pipelining in microprocessors, where the life cycle of an instruction consists of steps such as fetch, decode, execute and write. A distributed workflow system can be viewed as a large pipeline consisting of many tasks divided into sub-stages, where the main bottleneck is remote data access/retrieval owing to network latency and communication overhead. Just as pipelining techniques are used to overlap different types of jobs and execute them concurrently while preserving the task sequence and dependencies, we can order and schedule data movement jobs in distributed systems independent of compute tasks to exploit parallel execution while preserving data dependencies. For this purpose, we expand the scientific workflows to include the necessary data-placement steps, such as stage-in and stage-out, as well as other important steps that support data movement, such as allocating and de-allocating storage space for the data and reserving and releasing the network links.
We have done a preliminary implementation of data-aware workflow management where data-awareness components were added to the workflow planning tool Pegasus and workflow execution tool Condor DAGMan. This preliminary implementation was used in coastal hazard prediction  and reservoir uncertainty analysis  applications. We have developed a preliminary model to choose between remote I/O and staging for particular applications. As one of the results of that work, we suggest that, for remote I/O to be more efficient than staging, the following equation should hold true: 3.1where Rl is the time to read from local disk to local memory, Wl is the time to write from local memory to local disk, Ns is the time to send data over the network via a staging protocol and Nr is the time to send data over the network via a remote I/O protocol. This equation shows that the time difference coming from using a specialized data transfer protocol versus a remote I/O protocol should be less than the overhead of extra read/write to the disk in staging. In other words, if your remote I/O library performs well in data transfer over the network, or your local disk performance is slow, remote I/O might be advantageous over staging. Otherwise, the staging method would perform better. One challenge would be estimating these parameters without actually running the application.
For this purpose, the Stork data scheduler includes network and storage bandwidth-monitoring capabilities that collect statistics on the maximum available end-to-end bandwidth, actual bandwidth utilization, latency and the number of hops to be travelled. In order to estimate the speed at which data movement can take place, it is necessary to estimate the bandwidth capability at the source storage system, at the target storage system and the network in between.
In the next two sections, we will present two unique features of Stork: aggregation of data transfer jobs considering their source and destination addresses and the application-level throughput estimation and optimization service.
4. Request aggregation
We have implemented the aggregation of data transfer requests in the Stork data scheduler in order to minimize the overhead of connection set-up and tear-down for each transfer. Inspired by prefetching and caching techniques in microprocessor design, data placement jobs are combined and embedded into a single request in which we increase the overall performance especially for transfers of datasets with small file sizes.
The Stork data scheduler accepts multiple data transfer jobs in a non-deterministic order. For each transfer operation, we need to initialize the network transfer protocol and set up a connection to a remote data transfer service. A single transfer t consists of tsetup, in which we initialize the transfer module and prepare a connection to transfer the data, and ttransfer, in which we transfer data using the data transfer protocol over a network. Although this connection time is small compared with the total duration spent for transferring the data, it becomes quite important if there are hundreds of jobs to be scheduled. We have minimized the load put by tsetup by multiple transfer jobs. Instead of having separate set-up operations such as t1=tsetup+t1transfer, t2=tsetup+t2transfer,…,tn=tsetup+tntransfer, we aggregate multiple requests to improve the total transfer time t=tsetup+t1transfer+t2transfer+⋯+tntransfer.
Aggregating data placement jobs and combining data transfer requests into a single operation also has its benefits in terms of improving the overall scheduler performance, by reducing the total number of requests that the data scheduler needs to execute separately. Instead of initiating each request one at a time, it is more beneficial to execute them as a single operation if they are requesting data from the same storage site or using the same protocol to access data. Figure 1 presents the performance gain by aggregating the file transfer requests. Table 1 shows the overhead in the connection set-up. The test environment includes host machines from the Louisiana Optical Networking Initiative (LONI) network. Transfer operations are performed using the GridFTP protocol. The average round-trip delay times between test hosts are 5.131 and 0.548 ms. As latency increases, the effect of overhead in connection time increases. We see better throughput results with aggregated requests. The main performance gain comes from decreasing the amount of protocol usage and reducing the number of independent network connections.
We have successfully applied job aggregation in the Stork scheduler such that the total throughput is increased by reducing the number of transfer operations. According to the file size and source/destination pairs, data placement requests are combined and processed as a single transfer job. Information about aggregated requests is stored in a transparent manner. A main job that includes multiple requests is defined virtually and it is used to perform the transfer operation. Therefore, users can query and get status reports individually without knowing that their requests are aggregated and being executed with others. Stork performs a simple search in the job queue to determine which requests can be aggregated. In this step, our main criteria are that data placement jobs need to have the same user privileges and should be asking for the same data transfer protocol. We have seen a major increase in total throughput of data transfers, especially with small data files, simply by combining data placement jobs based on their source and destination addresses.
5. Throughput optimization
In data scheduling, the effective use of available network throughput and optimization of data transfer speed is crucial for end-to-end application performance. The throughput optimization in the Stork data scheduler is done by opening parallel streams and setting the optimal parallel stream number specific to each transfer. The intelligent selection of this number is based on a novel mathematical model we have developed that predicts the peak point of the throughput of parallel streams and the corresponding stream number for that value. The throughput of n streams (Thn) is calculated by the following equation: 5.1
The unknown variables a′, b′ and c′ are calculated based on the throughput samplings of three different parallelism data points. The detailed derivation of this equation and variables as well as the selection strategy of the data points (n1,n2,n3) are explained in our previous work .
The throughput increases as the stream number is increased, it reaches its peak point either when congestion occurs or the end-system capacities are reached. Further increasing the stream number does not increase the throughput but it causes a drop down owing to losses. In figure 2, we present the characteristics of the throughput curve for local area network (LAN) and WAN transfers. In both cases, the throughput has a peak point that indicates the optimal parallel stream number and it starts to drop down for larger numbers of streams. The prediction model we have developed (dynamic model order by Newton’s iteration) can predict the peak point by using the throughput values of parallelism levels 1, 8 and 16. Our model is much more accurate than previous work—Dinda model .
In cases where network optimization is not sufficient and the end-system characteristics play a decisive role in limiting the achievable throughput, making concurrent requests for multiple data transfers can improve the total throughput. The Stork data scheduler also enables the users to set a concurrency level in the server set-up and provides multiple data transfer jobs to be handled concurrently. However, it is up to the user to set this property.
We compare concurrency and parallelism through a set of different test cases to demonstrate the situations where a concurrent transfer can also improve an optimized transfer with parallel streams. The first test case shows us the memory-to-memory transfers with parallel streams and different concurrency levels between a local-area workstation with 100 Mbps network interface and a cluster I/O node in the LONI network with 10 Gbps network interface. The speed of the transfer is bound by the local-area workstation. In figure 3a, increasing the concurrency level does not provide a more significant improvement than increasing the parallel stream values. Thus, the throughput of four parallel streams is almost as good as the throughput of concurrency levels 2 and 4. The reason behind this outcome is that the throughput is bound by the interface and the CPU is fast enough and does not present a bottleneck. Next, we look into the case where we conducted transfers between two clusters with 10 Gbps network interfaces in the LONI network (figure 3b). The I/O nodes have four CPU cores in each. With parallel streams, we were able to achieve a throughput value of 6 Gbps. However, when we increase the concurrency level, we were be able to achieve around 8 Gbps with a combination of a concurrency level of 2 and a parallel stream number of 32. This significant increase is due to the fact that a single CPU reaches its upper limit with a single request but, through concurrency, multiple CPUs are used until the network limit is reached.
The effect of concurrency is much more significant for disk-to-disk transfers where multiple parallel disks are available and managed by parallel file systems. Figure 4 presents the results of disk-to-disk transfers between the I/O nodes of two clusters in the LONI network with 10 Gbps network interfaces. The disk system is managed by the Lustre parallel file system . The x-axis represents the range of parallel stream numbers and the concurrency level is also ranged between 1 and 10. As the concurrency level is increased, the size of the data is divided among the concurrent requests totalling to 12 Gbps. While the total throughput presents the total number of bytes transferred in unit time, the average throughput is calculated by the number of bytes transferred in unit time by each concurrent request. Parallel streams improve the throughput for a single concurrency level by increasing it from 500 to 750 Mbps. However, owing to serial disk access, this improvement is limited. Only by increasing the concurrency level can we improve the total throughput. The throughput in figure 4a is increased to 2 Gbps at a concurrency level of 4. After that point, increasing the concurrency level causes the throughput to be unstable with sudden ups and downs in throughput; however, it is always around 2 Gbps. This value is due to the end-system CPU limitations. If we increase the node number better throughput results could be seen. As we look into the figure for average throughput, the transfer speed per request falls as we increase the concurrency level (figure 4b).
We tested our throughput optimization embedded into the Stork data scheduler by combining with it different concurrency levels set for the server. Similar concurrency and data sizes are used as in the previous experiment for disk-to-disk transfers. For each concurrency level, we have submitted 30 jobs to the server and measured the total throughput as the total amount of data transferred divided by the time difference between the first job submitted to the scheduler and the last job finished. The average throughput is calculated as the throughput of each data transfer job separately based on the job start and finish time. When we look into the total throughput results in figure 5a, similar trends are seen. The optimization service provides 750 Mbps throughput for a single concurrency level and it increases up to 2 Gbps for a concurrency level of 4. The average throughput decreases more steeply after that level (figure 5b). Also the optimal parallel stream number decreases and adapts to the concurrency level (figure 5c). From the experiments, it can be seen that an appropriately chosen concurrency level may improve the transfer throughput significantly.
In this paper, we have discussed the limitations of the traditional CPU-oriented batch schedulers in handling the challenging data management problem of large-scale distributed e-Science applications. We have elaborated on how we can bring the concept of ‘data awareness’ to several of the most crucial distributed computing components, such as scheduling, workflow management and end-to-end throughput optimization. In this new paradigm, data placement activities are represented as full-featured jobs in the end-to-end workflow, and they are queued, managed, scheduled and optimized via a specialized data-aware scheduler.
As part of this new paradigm, we have developed a set of tools for mitigating the data bottleneck in distributed computing systems, which consists of three main components: a data scheduler that provides capabilities, such as planning, scheduling, resource reservation, job execution and error recovery for data movement tasks; integration of these capabilities with the other layers in distributed computing such as workflow planning; and further optimization of data movement tasks via aggregation of data transfer jobs considering their source and destination addresses, and through application-level throughput optimization. We have described how these last two features are implemented in Stork and their effects on end-to-end data transfer performance. Our results show that the optimizations performed by the Stork data scheduler help to achieve much higher end-to-end throughput in data transfers than non-optimized approaches.
We believe that the Stork data scheduler and this new ‘data-aware distributed computing’ paradigm will have an impact on all traditional compute and data-intensive e-Science disciplines, as well as new emerging computational areas in the arts, humanities, business and education that need to deal with increasingly large amounts of data.
This project is in part sponsored by the National Science Foundation under award numbers CNS-0846052 (CAREER), CNS-0619843 (PetaShare), OCI-0926701 (Stork) and EPS-0701491 (CyberTools), and by the Board of Regents, State of Louisiana, under contract number NSF/LEQSF (2007-10)-CyberRII-01.
One contribution of 12 to a Theme Issue ‘e-Science: novel research, new science and enduring impact’.
- This journal is © 2011 The Royal Society