Fan-out Sidekiq Jobs to Manage Large Workloads

Performing a resilient operation on bulk data can be challenging, especially if the operation relies on a third party. You cansafely do this by fanning out the work to idempotent background jobs that operate on only one piece of data at a time. Those jobscan retry independently as needed, making the entire operation more easy to manage. This postwill show an example of how that works and why you might want to use this pattern.

Fanning out is a way to perform work in parallel batches instead of inside a loop. Executing an operation this way providesmore control and more resilience. Doing this well requires a combination of both job and database design.

Simple Domain of Charging Subscriptions

Let���s take simplified domain of charging customers a subscription each month. Let���s say we have a subscriptions table that hasa customer ID, an amount to charge each month, and the date on which to charge them. Each month when we charge them, we���llupdate that date to be the next month. Let���s assume there is a customers table that has some sort of identifier to a thirdparty payment processor as well.

ERD diagram showing a subscription model that references a customer model. The subscription has 'id', 'customer id', 'month charge cents' and 'next charge' on fields. THe customer has an id a payment processor id field. There is an arrow from the subscription's customer id field to the customer's id field. Domain for Charging Subscriptions (Open bigger version in new window)

A simple way of doing this is to loop over each subscription, check if next_charge_on is today and, if so, charge thecustomer. Assume there is a ThirdPartyPaymentProcessor class that handles talking to our credit card payment service.

We���ll put this into a Sidekiq job and arrange for it to run every day.

class ChargeSubscriptionsJob include Sidekiq::Job def perform payment_processor = ThirdPartyPaymentProcessor.new Subscription.where(next_charge_on: Date.today).find_each do |subscription| payment_processor.charge!( subscription.customer.payment_processor_id, subscription.monthly_charge_cents ) subscription.update!(next_charge_on: Date.today + 1.month) end endend

Even at a moderate scale, this can become difficult to manage.

Difficulties with Long-Running Batch Jobs

Suppose our payment processor experiences anoutage partway through processing. The job will fail and be retried. The subscription being charged during the failure may ormay not have been charged. If it was, retrying this job will charge it again.

What if we have so many subscriptions that we can���t charge them all in one job? Most payment processors take a few seconds tocomplete a charge. If we had 1,000 customers to charge on any given day, that means this job would take about an hour tocomplete.

If you were to deploy, or cycle infrastructure (as is common with cloud-hosted services) it could fail partway through. What ifthere is some bug or problem with the data such that a particular subscription always causes a failure? If the job processessubscriptions in the same order, it would always fail at the errant subscription, preventing the entire batch from evercompleting (a so-called ���poison pill���).

Large jobs that operate on a lot of data and run for a long time are magnets for failures. It can be often difficult to unwindwhat went wrong and correct it. If we could break up the logic into manageable chunks, that might make it easier.

Breaking up Batch Operations to Small Chunks

Let���s keep ChargeSubscriptionsJob selecting subscriptions to charge but, instead of charging them, it queues a job for each subscription to charge. This is called ���fanning out��� because it���s usually diagrammed like so, which looks like fanning out playing cards:

Diagram showing a job named 'ChargeSubscriptionsJob' that is pointing to 5 nodes named 'ChargeJob' One job fanning out to other jobs (Open bigger version in new window)

Let���s try it. ChargeSubscriptionsJob will queue ChargeJob like so:

class ChargeSubscriptionsJob include Sidekiq::Job def perform Subscription.where(next_charge_on: Date.today).find_each do |subscription| ChargeJob.perform_later(subscription_id) # subscription.update!(next_charge_on: Date.today + 1.month) end endend

The ChargeJob contains all the code we just removed:

class ChargeJob include Sidekiq::Job def perform(subscription_id) payment_processor = ThirdPartyPaymentProcessor.new subscription = Subscription.find(subscription_id) payment_processor.charge!( subscription.customer.payment_processor_id, subscription.monthly_charge_cents ) endend

Now, ChargeSubscriptionsJob doesn���t depend on the payment processor. It just depends on the database and the Redis being usedfor Sidekiq. These are under our control and less likely to fail. And, since we only update next_charge_on after wesuccessfully queue ChargeJob, if ChargeSubscriptionsJob gets retried, it won���t queue the same subscription twice.

This also means that any problematic subscription won���t spoil the entire batch. The so-called poison pill subscription wouldcontinue to fail, but each time it got retried, other subscriptions would get processed first. This failed job no longer preventsthe entire batch from failing, turning it into just another failed job and not a traditional poison pill.

Of course, changing our design to fan out jobs introduce other failure modes we need to address.

Failures When Fanning Out

If you think about our updated design, the ChargeJob instances queued to Sidekiq are the only place we have a record of whatsubscriptions to charge and how much to charge them. Sidekiq is a great job processor, but it���s not a database.

What this means is that if monthly_charge_cents changed after it queued a ChargeJob, but before it was processed, we���d chargethe wrong amount. Worse, if we lost Redis, we could lose some ChargeJobs and have no idea what subscriptions needed to getcharged. Sidekiq does its best to avoid this situation, but Redis is not a resilient database like Postgres.

What we should do is use our database to store information that weit need to persist, have our Sidekiq jobs fetch the data theyneed from there. The ChargeJob is really an intention to charge money that, when processed, becomes realized. We shouldstore that intention in our database.

Using the Database To Store Operational Data

Let���s call this an invoice. It���ll reference a subscription, hold the amount to charge, the original charge_on date, and anullable value for when the charge was completed:

ERD diagram showing an invoices model that references a subscription model that references a customer model. The invoice has 'id', 'subscription id', 'charge cents', 'charge on', and 'charged at' fields. The subscription has 'id', 'customer id', 'month charge cents' and 'next charge' on fields. THe customer has an id a payment processor id field. There is an arrow from the subscription's customer id field to the customer's id field. Updated domain with invoices (Open bigger version in new window)

Now, ChargeSubscriptionsJob will create an invoice and ChargeJob will accept an invoice id to charge. BecauseChargeSubscriptionsJob now has to both create the invoice and update the Subscription, we want to perform both of thoseinside a database transaction. That way, either both changes are made or neither are, and we don���t end up in a half-updatedstate.

class ChargeSubscriptionsJob include Sidekiq::Job def perform Subscription.where(next_charge_on: Date.today).find_each do |subscription| ActiveRecord::Base.transaction do invoice = subscription.invoices.create!( charge_on: subscription.charge_on, charge_cents: subscription.monthly_charge_cents, charged_at: nil ) subscription.update!( next_charge_on: Date.today + 1.month ) end ChargeJob.perform_later(invoice_id) end endend

Note that ChargeJob is now queued after all the database updates. While, in theory, we could queue it right after creatingthe invoice, that would require doing so inside an open database transaction. This is bad. At even moderate scale, this cancause the locks required to keep the transaction open to be open for too long and have a cascading effect on the system. Thiseffect can be extremely hard to diagnose back to the open transaction.

This has implications we���ll get to in a minute, but let���s see the updated ChargeJob:

class ChargeJob include Sidekiq::Job def perform(invoice_id) payment_processor = ThirdPartyPaymentProcessor.new invoice = Invoice.find(invoice_id) if invoice.charged_at.present? Rails.logger.info "Invoice #{invoice.id} already charged" return end customer = invoice.subscription.customer payment_processor.charge!( customer.payment_processor_id, invoice.charge_cents ) invoice.update!(charged_at: Time.zone.now) endend

ChargeJob is mostly the same, except it now updated the invoice to indicate it was charged. It also checks to make sure theinvoice wasn���t already charged.

This now includes everything needed to manage these jobs inside the database. If we lost Redis entirely, we can look at anyinvoice where charged_at was null and know that it hadn���t been charged. In fact, we could eliminate the need forChargeSubscriptionsJob to queue ChargeJobs entirely by creating a new job called ChargeOutstandingInvoicesJob.

Using the Database to Drive Job Queueing

First, we remove the call to ChargeJob.perform_later:

class ChargeSubscriptionsJob include Sidekiq::Job def perform Subscription.where(next_charge_on: Date.today).find_each do |subscription| ActiveRecord::Base.transaction do invoice = subscription.invoices.create!( charge_on: subscription.charge_on, charge_cents: subscription.monthly_charge_cents, charged_at: nil ) subscription.update!( next_charge_on: Date.today + 1.month ) end # XXX ChargeJob.perform_later(invoice_id) end endend

This means that ChargeSubscriptionsJob is always safe to retry under any circumstance, since it will always pick up where itleft off���as long as it completes all subscriptions before the end of the day.

To get the invoices charged, ChargeOutstandingInvoicesJob will look like so:

class ChargeOutstandingInvoicesJob include Sidekiq::Job def perform_at Invoice.where(charged_at: nil).find_each do |invoice| ChargeJob.perform_later(invoice.id) end endend

Is ChargeOutstandingInvoicesJob safe to retry? Yes, with a qualification. Since ChargeJob checks that charged_at is null, this avoids a race condition where a retry of ChargeOutstandingInvoicesJob could queue two ChargeJobs for the same invoice.

What is a problem with ChargeJob regardless of how ChargeOutstandingInvoicesJob (or ChargeSubscriptionsJob) is implementedis that the third party payment processor call needs to be idempotent. We need to make sure that happens exactly once.

This is covered in detail in the book. There is a sample app that demonstrates this exact problem, and a detailed discussion of how to manage it. The book shows you some code to address it, and you can see it working with the example app.

Addendum - Bulk Queueing API

If you aren���t familiar with Sidekiq���s Bulk Queueing API, a better way toimplement ChargeOutstandingInvoicesJob would be to bulk queue the ids in batches of 1000, like so:

class ChargeOutstandingInvoicesJob include Sidekiq::Job def perform_at array_of_job_args = Invoice. where(charged_at: nil). # Get all not charged pluck(:id). # Get only their ids zip # turn each element into # a single-element array # batch size is 1000 by default ChargeJob.perform_bulk(array_of_job_args) endend

This is a more efficient���and thus less error prone���way to queue a bunch of jobs based on the results of a database query.array_of_job_args is an array where each element represents an invoice, and those elements are themselves arraysthat contain a single argument: the invoice���s id.

 •  0 comments  •  flag
Share on Twitter
Published on November 09, 2023 08:00
No comments have been added yet.