
Data streaming applications continuously process incoming data, much like a never-ending query against a database. Unlike traditional database queries where you request data one time and receive a single response, streaming data applications constantly receive new data in real time. This introduces some complexity, particularly around error handling. This post discusses the strategies for handling errors in Apache Flink applications. However, the general principles discussed here apply to stream processing applications at large.
Error handling in streaming applications
When developing stream processing applications, navigating complexities—especially around error handling—is crucial. Fostering data integrity and system reliability requires effective strategies to tackle failures while maintaining high performance. Striking this balance is essential for building resilient streaming applications that can handle real-world demands. In this post, we explore the significance of error handling and outline best practices for achieving both reliability and efficiency.
Before we can talk about how to handle errors in our consumer applications, we first need to consider the two most common types of errors that we encounter: transient and nontransient.
Transient errors, or retryable errors, are temporary issues that usually resolve themselves without requiring significant intervention. These can include network timeouts, temporary service unavailability, or minor glitches that don’t indicate a fundamental problem with the system. The key characteristic of transient errors is that they’re often short-lived and retrying the operation after a brief delay is usually enough to successfully complete the task. We dive deeper into how to implement retries in your system in the following section.
Nontransient errors, on the other hand, are persistent issues that don’t go away with retries and may indicate a more serious underlying problem. These could involve things such as data corruption or business logic violations. Nontransient errors require more comprehensive solutions, such as alerting operators, skipping the problematic data, or routing it to a dead letter queue (DLQ) for manual review and remediation. These errors need to be addressed directly to prevent ongoing issues within the system. For these types of errors, we explore DLQ topics as a viable solution.
Retries
As previously mentioned, retries are mechanisms used to handle transient errors by reprocessing messages that initially failed due to temporary issues. The goal of retries is to make sure that messages are successfully processed when the necessary conditions—such as resource availability—are met. By incorporating a retry mechanism, messages that can’t be processed immediately are reattempted after a delay, increasing the likelihood of successful processing.
We explore this approach through the use of an example based on the Amazon Managed Service for Apache Flink retries with Async I/O code sample. The example focuses on implementing a retry mechanism in a streaming application that calls an external endpoint during processing for purposes such as data enrichment or real-time validation
The application does the following:
- Generates data simulating a streaming data source
- Makes an asynchronous API call to an Amazon API Gateway or AWS Lambda endpoint, which randomly returns success, failure, or timeout. This call is made to emulate the enrichment of the stream with external data, potentially stored in a database or data store.
- Processes the application based on the response returned from the API Gateway endpoint:
-
- If the API Gateway response is successful, processing will continue as normal
- If the API Gateway response times out or returns a retryable error, the record will be retried a configurable number of times
- Reformats the message in a readable format, extracting the result
- Sends messages to the sink topic in our streaming storage layer
In this example, we use an asynchronous request that allows our system to handle many requests and their responses concurrently—increasing the overall throughput of our application. For more information on how to implement asynchronous API calls in Amazon Managed Service for Apache Flink, refer to Enrich your data stream asynchronously using Amazon Kinesis Data Analytics for Apache Flink.
Before we explain the application of retries for the Async function call, here is the AsyncInvoke implementation that will call our external API:
This example uses an AsyncHttpClient
to call an HTTP endpoint that is a proxy to calling a Lambda function. The Lambda function is relatively straightforward, in that it merely returns SUCCESS. Async I/O in Apache Flink allows for making asynchronous requests to an HTTP endpoint for individual records and handles responses as they arrive back to the application. However, Async I/O can work with any asynchronous client that returns a Future
or CompletableFuture
object. This means that you can also query databases and other endpoints that support this return type. If the client in question makes blocking requests or can’t support asynchronous requests with Future
return types, there isn’t any benefit to using Async I/O.
Some helpful notes when defining your Async I/O function:
- Increasing the
capacity
parameter in your Async I/O function call will increase the number of in-flight requests. Keep in mind this will cause some overhead on checkpointing, and will introduce more load to your external system. - Keep in mind that your external requests are saved in application state. If the resulting object from the Async I/O function call is complex, object serialization may fall back to Kryo serialization which can impact performance.
The Async I/O function can process multiple requests concurrently without waiting for each one to be complete before processing the next. Apache Flink’s Async I/O function provides functionality for both ordered and unordered results when receiving responses back from an asynchronous call, giving flexibility based on your use case.
Errors during Async I/O requests
In the case that there is a transient error in your HTTP endpoint, there could be a timeout in the Async HTTP request. The timeout could be caused by the Apache Flink application overwhelming your HTTP endpoint, for example. This will, by default, result in an exception in the Apache Flink job, forcing a job restart from the latest checkpoint, effectively retrying all data from an earlier point in time. This restart strategy is expected and typical for Apache Flink applications, built to withstand errors without data loss or reprocessing of data. Restoring from the checkpoint should result in a fast restart with 30 seconds (P90) of downtime.
Because network errors could be temporary, backing off for a period and retrying the HTTP request could have a different result. Network errors could mean receiving an error status code back from the endpoint, but it could also mean not getting a response at all, and the request timing out. We can handle such cases within the Async I/O framework and use an Async retry strategy to retry the requests as needed. Async retry strategies are invoked when the ResultFuture request to an external endpoint is complete with an exception that you define in the preceding code snippet. The Async retry strategy is defined as follows:
When implementing this retry strategy, it’s important to have a solid understanding of the system you will be querying. How will retries impact performance? In the code snippet, we’re using a FixedDelayRetryStrategy
that retries requests upon error one time every second with a delay of one second. The FixedDelayRetryStrategy
is only one of several available options. Other retry strategies built into Apache Flink’s Async I/O library include the ExponentialBackoffDelayRetryStrategy
, which increases the delay between retries exponentially upon every retry. It’s important to tailor your retry strategy to the specific needs and constraints of your target system.
Additionally, within the retry strategy, you can optionally define what happens when there are no results returned from the system or when there are exceptions. The Async I/O function in Flink uses two important predicates: isResult
and isException
.
The isResult
predicate determines whether a returned value should be considered a valid result. If isResult
returns false, in the case of empty or null responses, it will trigger a retry attempt.
The isException
predicate evaluates whether a given exception should lead to a retry. If isException
returns true for a particular exception, it will initiate a retry. Otherwise, the exception will be propagated and the job will fail.
If there is a timeout, you can override the timeout function within the Async I/O function to return zero results, which will result in a retry in the preceding block. This is also true for exceptions, which will result in retries, depending on the logic you determine to cause the .compleExceptionally()
function to trigger.
By carefully configuring these predicates, you can fine-tune your retry logic to handle various scenarios, such as timeouts, network issues, or specific application-level exceptions, making sure your asynchronous processing is robust and efficient.
One key factor to keep in mind when implementing retries is the potential impact on overall system performance. Retrying operations too aggressively or with insufficient delays can lead to resource contention and reduced throughput. Therefore, it’s crucial to thoroughly test your retry configuration with representative data and loads to make sure you strike the right balance between resilience and efficiency.
A full code sample can be found at the amazon-managed-service-for-apache-flink-examples repository.
Dead letter queue
Although retries are effective for managing transient errors, not all issues can be resolved by reattempting the operation. Nontransient errors, such as data corruption or validation failures, persist despite retries and require a different approach to protect the integrity and reliability of the streaming application. In these cases, the concept of DLQs comes into play as a vital mechanism for capturing and isolating individual messages that can’t be processed successfully.
DLQs are intended to handle nontransient errors affecting individual messages, not system-wide issues, which require a different approach. Additionally, the use of DLQs might impact the order of messages being processed. In cases where processing order is important, implementing a DLQ may require a more detailed approach to make sure it aligns with your specific business use case.
Data corruption can’t be handled in the source operator of the Apache Flink application and will cause the application to fail and restart from the latest checkpoint. This issue will persist unless the message is handled outside of the source operator, downstream in a map operator or similar. Otherwise, the application will continue retrying and retrying.
In this section, we focus on how DLQs in the form of a dead letter sink can be used to separate messages from the main processing application and isolate them for a more focused or manual processing mechanism.
Consider an application that is receiving messages, transforming the data, and sending the results to a message sink. If a message is identified by the system as corrupt, and therefore can’t be processed, merely retrying the operation won’t fix the issue. This could result in the application getting stuck in a continuous loop of retries and failures. To prevent this from happening, such messages can be rerouted to a dead letter sink for further downstream exception handling.
This implementation results in our application having two different sinks: one for successfully processed messages (sink-topic) and one for messages that couldn’t be processed (exception-topic), as shown in the following diagram. To achieve this data flow, we need to “split” our stream so that each message goes to its appropriate sink topic. To do this in our Flink application, we can use side outputs.
The diagram demonstrates the DLQ concept through Amazon Managed Streaming for Apache Kafka topics and an Amazon Managed Service for Apache Flink application. However, this concept can be implemented through other AWS streaming services such as Amazon Kinesis Data Streams.
Side outputs
Using side outputs in Apache Flink, you can direct specific parts of your data stream to different logical streams based on conditions, enabling the efficient management of multiple data flows within a single job. In the context of handling nontransient errors, you can use side outputs to split your stream into two paths: one for successfully processed messages and another for those requiring additional handling (i.e. routing to a dead letter sink). The dead letter sink, often external to the application, means that problematic messages are captured without disrupting the main flow. This approach maintains the integrity of your primary data stream while making sure errors are managed efficiently and in isolation from the overall application.
The following shows how to implement side outputs into your Flink application.
Consider the example that you have a map transformation to identify poison messages and produce a stream of tuples:
Based on the processing result, you know whether you want to send this message to your dead letter sink or continue processing it in your application. Therefore, you need to split the stream to handle the message accordingly:
First create an OutputTag
to route invalid events to a side output stream. This OutputTag
is a typed and named identifier you can use to separately manage and direct specific events, such as invalid ones, to a distinct stream for further handling.
Next, apply a ProcessFunction
to the stream. The ProcessFunction
is a low-level stream processing operation that gives access to the basic building blocks of streaming applications. This operation will process each event and decide its path based on its validity. If an event is marked as invalid, it’s sent to the side output stream defined by the OutputTag
. Valid events are emitted to the main output stream, allowing for continued processing without disruption.
Then retrieve the side output stream for invalid events using getSideOutput(invalidEventsTag)
. You can use this to independently access the events that were tagged and send them to the dead letter sink. The remainder of the messages will remain in the mainStream
, where they can either continue to be processed or be sent to their respective sink:
The following diagram shows this workflow.
A full code sample can be found at the amazon-managed-service-for-apache-flink-examples repository.
What to do with messages in the DLQ
After successfully routing problematic messages to a DLQ using side outputs, the next step is determining how to handle these messages downstream. There isn’t a one-size-fits-all approach for managing dead letter messages. The best strategy depends on your application’s specific needs and the nature of the errors encountered. Some messages might be resolved though specialized applications or automated processing, while others might require manual intervention. Regardless of the approach, it’s crucial to make sure there is sufficient visibility and control over failed messages to facilitate any necessary manual handling.
A common approach is to send notifications through services such as Amazon Simple Notification Service (Amazon SNS), alerting administrators that certain messages weren’t processed successfully. This can help make sure that issues are promptly addressed, reducing the risk of prolonged data loss or system inefficiencies. Notifications can include details about the nature of the failure, enabling quick and informed responses.
Another effective strategy is to store dead letter messages externally from the stream, such as in an Amazon Simple Storage Service (Amazon S3) bucket. By archiving these messages in a central, accessible location, you enhance visibility into what went wrong and provide a long-term record of unprocessed data. This stored data can be reviewed, corrected, and even re-ingested into the stream if necessary.
Ultimately, the goal is to design a downstream handling process that fits your operational needs, providing the right balance of automation and manual oversight.
Conclusion
In this post, we looked at how you can leverage concepts such as retries and dead letter sinks for maintaining the integrity and efficiency of your streaming applications. We demonstrated how you can implement these concepts through Apache Flink code samples highlighting Async I/O and Side Output capabilities:
To supplement, we’ve included the code examples highlighted in this post in the above list. For more details, refer to the respective code samples. It’s best to test these solutions with sample data and known results to understand their respective behaviors.
About the Authors
Alexis Tekin is a Solutions Architect at AWS, working with startups to help them scale and innovate using AWS services. Previously, she supported financial services customers by developing prototype solutions, leveraging her expertise in software development and cloud architecture. Alexis is a former Texas Longhorn, where she graduated with a degree in Management Information Systems from the University of Texas at Austin.
Jeremy Ber has been in the software space for over 10 years with experience ranging from Software Engineering, Data Engineering, Data Science and most recently Streaming Data. He currently serves as a Streaming Specialist Solutions Architect at Amazon Web Services, focused on Amazon Managed Streaming for Apache Kafka (MSK) and Amazon Managed Service for Apache Flink (MSF).