Amazon EMR Serverless observability, Part 1: Monitor Amazon EMR Serverless workers in near real time using Amazon CloudWatch


Amazon EMR Serverless allows you to run open source big data frameworks such as Apache Spark and Apache Hive without managing clusters and servers. With EMR Serverless, you can run analytics workloads at any scale with automatic scaling that resizes resources in seconds to meet changing data volumes and processing requirements.

We have launched job worker metrics in Amazon CloudWatch for EMR Serverless. This feature allows you to monitor vCPUs, memory, ephemeral storage, and disk I/O allocation and usage metrics at an aggregate worker level for your Spark and Hive jobs.

This post is part of a series about EMR Serverless observability. In this post, we discuss how to use these CloudWatch metrics to monitor EMR Serverless workers in near real time.

CloudWatch metrics for EMR Serverless

At the per-Spark job level, EMR Serverless emits the following new metrics to CloudWatch for both driver and executors. These metrics provide granular insights into job performance, bottlenecks, and resource utilization.

WorkerCpuAllocated The total numbers of vCPU cores allocated for workers in a job run
WorkerCpuUsed The total numbers of vCPU cores utilized by workers in a job run
WorkerMemoryAllocated The total memory in GB allocated for workers in a job run
WorkerMemoryUsed The total memory in GB utilized by workers in a job run
WorkerEphemeralStorageAllocated The number of bytes of ephemeral storage allocated for workers in a job run
WorkerEphemeralStorageUsed The number of bytes of ephemeral storage used by workers in a job run
WorkerStorageReadBytes The number of bytes read from storage by workers in a job run
WorkerStorageWriteBytes The number of bytes written to storage from workers in a job run

The following are the benefits of monitoring your EMR Serverless jobs with CloudWatch:

  • Optimize resource utilization – You can gain insights into resource utilization patterns and optimize your EMR Serverless configurations for better efficiency and cost savings. For example, underutilization of vCPUs or memory can reveal resource wastage, allowing you to optimize worker sizes to achieve potential cost savings.
  • Diagnose common errors – You can identify root causes and mitigation for common errors without log diving. For example, you can monitor the usage of ephemeral storage and mitigate disk bottlenecks by preemptively allocating more storage per worker.
  • Gain near real-time insights – CloudWatch offers near real-time monitoring capabilities, allowing you to track the performance of your EMR Serverless jobs as and when they are running, for quick detection of any anomalies or performance issues.
  • Configure alerts and notifications – CloudWatch enables you to set up alarms using Amazon Simple Notification Service (Amazon SNS) based on predefined thresholds, allowing you to receive notifications through email or text message when specific metrics reach critical levels.
  • Conduct historical analysis – CloudWatch stores historical data, allowing you to analyze trends over time, identify patterns, and make informed decisions for capacity planning and workload optimization.

Solution overview

To further enhance this observability experience, we have created a solution that gathers all these metrics on a single CloudWatch dashboard for an EMR Serverless application. You need to launch one AWS CloudFormation template per EMR Serverless application. You can monitor all the jobs submitted to a single EMR Serverless application using the same CloudWatch dashboard. To learn more about this dashboard and deploy this solution into your own account, refer to the EMR Serverless CloudWatch Dashboard GitHub repository.

In the following sections, we walk you through how you can use this dashboard to perform the following actions:

  • Optimize your resource utilization to save costs without impacting job performance
  • Diagnose failures due to common errors without the need for log diving and resolve those errors optimally

Prerequisites

To run the sample jobs provided in this post, you need to create an EMR Serverless application with default settings using the AWS Management Console or AWS Command Line Interface (AWS CLI), and then launch the CloudFormation template from the GitHub repo with the EMR Serverless application ID provided as the input to the template.

You need to submit all the jobs in this post to the same EMR Serverless application. If you want to monitor a different application, you can deploy this template for your own EMR Serverless application ID.

Optimize resource utilization

When running Spark jobs, you often start with the default configurations. It can be challenging to optimize your workload without any visibility into actual resource utilization. Some of the most common configurations that we’ve seen customers adjust are spark.driver.cores, spark.driver.memory, spark.executor.cores, and spark.executors.memory.

To illustrate how the newly added CloudWatch dashboard worker-level metrics can help you fine-tune your job configurations for better price-performance and enhanced resource utilization, let’s run the following Spark job, which uses the NOAA Integrated Surface Database (ISD) dataset to run some transformations and aggregations.

Use the following command to run this job on EMR Serverless. Provide your Amazon Simple Storage Service (Amazon S3) bucket and EMR Serverless application ID for which you launched the CloudFormation template. Make sure to use the same application ID to submit all the sample jobs in this post. Additionally, provide an AWS Identity and Access Management (IAM) runtime role.

aws emr-serverless start-job-run \
--name emrs-cw-dashboard-test-1 \
 --application-id <APPLICATION_ID> \
 --execution-role-arn <JOB_ROLE_ARN> \
 --job-driver '{
 "sparkSubmit": {
 "entryPoint": "s3://<BUCKETNAME>/scripts/windycity.py",
 "entryPointArguments": ["s3://noaa-global-hourly-pds/2024/", "s3://<BUCKET_NAME>/emrs-cw-dashboard-test-1/"]
 } }'

Now let’s check the executor vCPUs and memory from the CloudWatch dashboard.

This job was submitted with default EMR Serverless Spark configurations. From the Executor CPU Allocated metric in the preceding screenshot, the job was allocated 396 vCPUs in total (99 executors * 4 vCPUs per executor). However, the job only used a maximum of 110 vCPUs based on Executor CPU Used. This indicates oversubscription of vCPU resources. Similarly, the job was allocated 1,584 GB memory in total based on Executor Memory Allocated. However, from the Executor Memory Used metric, we see that the job only used 176 GB of memory during the job, indicating memory oversubscription.

Now let’s rerun this job with the following adjusted configurations.

Original Job (Default Configuration) Rerun Job (Adjusted Configuration)
spark.executor.memory 14 GB 3 GB
spark.executor.cores 4 2
spark.dynamicAllocation.maxExecutors 99 30
Total Resource Utilization

6.521 vCPU-hours

26.084 memoryGB-hours

32.606 storageGB-hours

1.739 vCPU-hours

3.688 memoryGB-hours

17.394 storageGB-hours

Billable Resource Utilization

7.046 vCPU-hours

28.182 memoryGB-hours

0 storageGB-hours

1.739 vCPU-hours

3.688 memoryGB-hours

0 storageGB-hours

We use the following code:

aws emr-serverless start-job-run \
--name emrs-cw-dashboard-test-2 \
 --application-id <APPLICATION_ID> \
 --execution-role-arn <JOB_ROLE_ARN> \
 --job-driver '{
 "sparkSubmit": {
 "entryPoint": "s3://<BUCKETNAME>/scripts/windycity.py",
 "entryPointArguments": ["s3://noaa-global-hourly-pds/2024/", "s3://<BUCKET_NAME>/emrs-cw-dashboard-test-2/"],
 "sparkSubmitParameters": "--conf spark.driver.cores=2 --conf spark.driver.memory=3g --conf spark.executor.memory=3g --conf spark.executor.cores=2 --conf spark.dynamicAllocation.maxExecutors=30"
 } }'

Let’s check the executor metrics from the CloudWatch dashboard again for this job run.

In the second job, we see lower allocation of both vCPUs (396 vs. 60) and memory (1,584 GB vs. 120 GB) as expected, resulting in better utilization of resources. The original job ran for 4 minutes, 41 seconds. The second job took 4 minutes, 54 seconds. This reconfiguration has resulted in 79% lower cost savings without affecting the job performance.

You can use these metrics to further optimize your job by increasing or decreasing the number of workers or the allocated resources.

Diagnose and resolve job failures

Using the CloudWatch dashboard, you can diagnose job failures due to issues related to CPU, memory, and storage such as out of memory or no space left on the device. This enables you to identify and resolve common errors quickly without having to check the logs or navigate through Spark History Server. Additionally, because you can check the resource utilization from the dashboard, you can fine-tune the configurations by increasing the required resources only as much as needed instead of oversubscribing to the resources, which further saves costs.

Driver errors

To illustrate this use case, let’s run the following Spark job, which creates a large Spark data frame with a few million rows. Typically, this operation is done by the Spark driver. While submitting the job, we also configure spark.rpc.message.maxSize, because it’s required for task serialization of data frames with a large number of columns.

aws emr-serverless start-job-run \
--name emrs-cw-dashboard-test-3 \
--application-id <APPLICATION_ID> \
--execution-role-arn <JOB_ROLE_ARN> \
--job-driver '{
"sparkSubmit": {
"entryPoint": "s3://<BUCKETNAME>/scripts/create-large-disk.py"
"sparkSubmitParameters": "--conf spark.rpc.message.maxSize=2000"
} }'

After a few minutes, the job failed with the error message “Encountered errors when releasing containers,” as seen in the Job details section.

When encountering non-descriptive error messages, it becomes crucial to investigate further by examining the driver and executor logs to troubleshoot further. But before further log diving, let’s first check the CloudWatch dashboard, specifically the driver metrics, because releasing containers is generally performed by the driver.

We can see that the Driver CPU Used and Driver Storage Used are well within their respective allocated values. However, upon checking Driver Memory Allocated and Driver Memory Used, we can see that the driver was using all of the 16 GB memory allocated to it. By default, EMR Serverless drivers are assigned 16 GB memory.

Let’s rerun the job with more driver memory allocated. Let’s set driver memory to 27 GB as the starting point, because spark.driver.memory + spark.driver.memoryOverhead should be less than 30 GB for the default worker type. park.rpc.messsage.maxSize will be unchanged.

aws emr-serverless start-job-run \
—name emrs-cw-dashboard-test-4 \
—application-id <APPLICATION_ID> \
—execution-role-arn <JOB_ROLE_ARN> \
—job-driver '{
"sparkSubmit": {
"entryPoint": "s3://<BUCKETNAME>/scripts/create-large-disk.py"
"sparkSubmitParameters": "--conf spark.driver.memory=27G --conf spark.rpc.message.maxSize=2000"
} }'

The job succeeded this time around. Let’s check the CloudWatch dashboard to observe driver memory utilization.

As we can see, the allocated memory is now 30 GB, but the actual driver memory utilization didn’t exceed 21 GB during the job run. Therefore, we can further optimize costs here by reducing the value of spark.driver.memory. We reran the same job with spark.driver.memory set to 22 GB, and the job still succeeded with better driver memory utilization.

Executor errors

Using CloudWatch for observability is ideal for diagnosing driver-related issues because there is only one driver per job and driver resources used is the actual resource usage of the single driver. On the other hand, executor metrics are aggregated across all the workers. However, you can use this dashboard to provide only an adequate amount of resources to make your job succeed, thereby avoiding oversubscription of resources.

To illustrate, let’s run the following Spark job, which simulates uniform disk over-utilization across all workers by processing very large NOAA datasets from several years. This job also transiently caches a very large data frame on disk.

aws emr-serverless start-job-run \
--name emrs-cw-dashboard-test-5 \
--application-id <APPLICATION_ID> \
--execution-role-arn <JOB_ROLE_ARN> \
--job-driver '{
"sparkSubmit": {
"entryPoint": "s3://<BUCKETNAME>/scripts/noaa-disk.py"
} }'

After a few minutes, we can see that the job failed with “No space left on device” error in the Job details section, which indicates that some of the workers have run out of disk space.

Checking the Running Executors metric from the dashboard, we can identify that there were 99 executor workers running. Each worker comes with 20 GB storage by default.

Because this is a Spark task failure, let’s check the Executor Storage Allocated and Executor Storage Used metrics from the dashboard (because the driver won’t run any tasks).

As we can see, the 99 executors have used up a total of 1,940 GB from the total allocated executor storage of 2,126 GB. This includes both the data shuffled by the executors and the storage used for caching the data frame. We don’t see the full 2,126 GB being utilized from this graph because there might be a few executors out of the 99 executors that weren’t holding much data when the job failed (before these executors could start processing tasks and store the data frame chunks).

Let’s rerun the same job but with increased executor disk size using the parameter spark.emr-serverless.executor.disk. Let’s try with 40 GB disk per executor as a starting point.

aws emr-serverless start-job-run \
--name emrs-cw-dashboard-test-6 \
--application-id <APPLICATION_ID> \
--execution-role-arn <JOB_ROLE_ARN> \
--job-driver '{
"sparkSubmit": {
"entryPoint": "s3://<BUCKETNAME>/scripts/noaa-disk.py"
"sparkSubmitParameters": "--conf spark.emr-serverless.executor.disk=40G"
}
}'

This time, the job ran successfully. Let’s check the Executor Storage Allocated and Executor Storage Used metrics.

Executor Storage Allocated is now 4,251 GB because we’ve doubled the value of spark.emr-serverless.executor.disk. Although there is now twice as much aggregated executors’ storage, the job still used only a maximum of 1,940 GB out of 4,251 GB. This indicates that our executors were likely running out of disk space only by a few GBs. Therefore, we can try to set spark.emr-serverless.executor.disk to an even lower value like 25 GB or 30 GB instead of 40 GB to save storage costs as we did in the previous scenario. In addition, you can monitor Executor Storage Read Bytes and Executor Storage Write Bytes to see if your job is I/O intensive. In this case, you can use the Shuffle-optimized disks feature of EMR Serverless to further enhance your job’s I/O performance.

The dashboard is also useful to capture information about transient storage used while caching or persisting the data frames, including spill-to-disk scenarios. The Storage tab of Spark History Server records any caching activities, as seen in the following screenshot. However, this data will be lost from Spark History Server after the cache is evicted or when the job finishes. Therefore, Executor Storage Used can be used to do an analysis of a failed job run due to transient storage issues.

In this particular example, the data was evenly distributed among the executors. However, if you have a data skew (for, example only 1–2 executors out of 99 process the most amount of data, and as a result, your job runs out of disk space), the CloudWatch dashboard won’t accurately capture this scenario because the storage data is aggregated across all the executors for a job. For diagnosing issues at the individual executor level, we need to track per-executor-level metrics. We explore more advanced examples of how per-worker-level metrics can help you identify, mitigate, and resolve hard-to-find issues through EMR Serverless integration with Amazon Managed Service for Prometheus.

Conclusion

In this post, you learned how to effectively manage and optimize your EMR Serverless application using a single CloudWatch dashboard with enhanced EMR Serverless metrics. These metrics are available in all AWS Regions where EMR Serverless is available. For more details about this feature, refer to Job-level monitoring.


About the Authors

Kashif Khan is a Sr. Analytics Specialist Solutions Architect at AWS, specializing in big data services like Amazon EMR, AWS Lake Formation, AWS Glue, Amazon Athena, and Amazon DataZone. With over a decade of experience in the big data domain, he possesses extensive expertise in architecting scalable and robust solutions. His role involves providing architectural guidance and collaborating closely with customers to design tailored solutions using AWS analytics services to unlock the full potential of their data.

Veena Vasudevan is a Principal Partner Solutions Architect and Data & AI specialist at AWS. She helps customers and partners build highly optimized, scalable, and secure solutions; modernize their architectures; and migrate their big data, analytics, and AI/ML workloads to AWS.


Comments

Leave a Reply

Your email address will not be published. Required fields are marked *