Skip to main content

Async tasks

Flexible, durable scheduling across processes and time.

Write resilient scheduling logic with the flexibility of code: delay execution, re-attach to ongoing tasks, fan out/in, etc.
Restate runs your tasks to completion exactly once.

Durable timers
Register timers with Restate to make them durable. Restate makes sure that they get fired when they should, whether it's millis or months.
Message queue
Schedule tasks asynchronously, by using Restate as message queue. Tasks execute with workflow-like semantics and durability. Restate handles retries and recovery of progress.
Switch between async and sync
Schedule an async task, by using Restate as queue. Let another process latch on to it later and wait for the result, or peek at the output to see if it has finished.

Async tasks with Restate

Schedule tasks for now or later with the Restate SDK.

Execute any handler async

Every handler in Restate is executed asynchronously and can be treated as a reliable asynchronous task. No matter whether it is a simple function, or a complex workflow. Restate persists the requests to this handler and makes sure they run to completion. Restate handles retries and recovery upon failures.

task_submitter.ts
async_task_service.ts

const asyncTaskService = restate.service({
name: "taskWorker",
handlers: {
runTask: async (ctx: Context, params: TaskOpts) => {
return someHeavyWork(params);
},
},
});
export type AsyncTaskService = typeof asyncTaskService;
const endpoint = restate.endpoint().bind(asyncTaskService).listen(9080);

Schedule tasks reliably

Schedule tasks asynchronously, by using Restate as message queue. Restate reliably queues them, also under backpressure/load.

Handlers can be called asynchronously from anywhere. This returns a task handle once the call in enqueued.

task_submitter.ts
async_task_service.ts

async function submitAndAwaitTask(task: TaskOpts) {
const rs = restate.connect({ url: RESTATE_URL });
const taskHandle = await rs
.serviceSendClient<AsyncTaskService>({ name: "taskWorker" })
.runTask(
task,
SendOpts.from({ idempotencyKey: "dQw4w9WgXcQ" })
);
// await the handler's result
const result = await rs.result(taskHandle);
}
async function attachToTask(taskHandle: string) {
const rs = restate.connect({ url: RESTATE_URL });
const result2 = await rs.result<string>(JSON.parse(taskHandle));
}

Idempotent task submission

Use an idempotency key to ensure that the task is only scheduled once. Restate will deduplicate the request and return the previous response.

task_submitter.ts
async_task_service.ts

async function submitAndAwaitTask(task: TaskOpts) {
const rs = restate.connect({ url: RESTATE_URL });
const taskHandle = await rs
.serviceSendClient<AsyncTaskService>({ name: "taskWorker" })
.runTask(
task,
SendOpts.from({ idempotencyKey: "dQw4w9WgXcQ" })
);
// await the handler's result
const result = await rs.result(taskHandle);
}
async function attachToTask(taskHandle: string) {
const rs = restate.connect({ url: RESTATE_URL });
const result2 = await rs.result<string>(JSON.parse(taskHandle));
}

Latch on to the task

For requests with an idempotency key, you can use this task handle to latch on to the task later and retrieve the result, or wait for it to finish.

task_submitter.ts
async_task_service.ts

async function submitAndAwaitTask(task: TaskOpts) {
const rs = restate.connect({ url: RESTATE_URL });
const taskHandle = await rs
.serviceSendClient<AsyncTaskService>({ name: "taskWorker" })
.runTask(
task,
SendOpts.from({ idempotencyKey: "dQw4w9WgXcQ" })
);
// await the handler's result
const result = await rs.result(taskHandle);
}
async function attachToTask(taskHandle: string) {
const rs = restate.connect({ url: RESTATE_URL });
const result2 = await rs.result<string>(JSON.parse(taskHandle));
}

This works across processes, so you can have a separate process latch on to the task later.

task_submitter.ts
async_task_service.ts

async function submitAndAwaitTask(task: TaskOpts) {
const rs = restate.connect({ url: RESTATE_URL });
const taskHandle = await rs
.serviceSendClient<AsyncTaskService>({ name: "taskWorker" })
.runTask(
task,
SendOpts.from({ idempotencyKey: "dQw4w9WgXcQ" })
);
// await the handler's result
const result = await rs.result(taskHandle);
}
async function attachToTask(taskHandle: string) {
const rs = restate.connect({ url: RESTATE_URL });
const result2 = await rs.result<string>(JSON.parse(taskHandle));
}

Execute any handler async

Every handler in Restate is executed asynchronously and can be treated as a reliable asynchronous task. No matter whether it is a simple function, or a complex workflow. Restate persists the requests to this handler and makes sure they run to completion. Restate handles retries and recovery upon failures.

Schedule tasks reliably

Schedule tasks asynchronously, by using Restate as message queue. Restate reliably queues them, also under backpressure/load.

Handlers can be called asynchronously from anywhere. This returns a task handle once the call in enqueued.

Idempotent task submission

Use an idempotency key to ensure that the task is only scheduled once. Restate will deduplicate the request and return the previous response.

Latch on to the task

For requests with an idempotency key, you can use this task handle to latch on to the task later and retrieve the result, or wait for it to finish.

This works across processes, so you can have a separate process latch on to the task later.

task_submitter.ts
async_task_service.ts

const asyncTaskService = restate.service({
name: "taskWorker",
handlers: {
runTask: async (ctx: Context, params: TaskOpts) => {
return someHeavyWork(params);
},
},
});
export type AsyncTaskService = typeof asyncTaskService;
const endpoint = restate.endpoint().bind(asyncTaskService).listen(9080);

LOW-LATENCY

Restate’s event-driven foundation built in Rust lets you queue events. Restate pushes them to your functions at high speed.

DURABLE EXECUTION

Restate makes sure all tasks run to completion. It keeps track of timers, handles retries and recovery upon failures, and ensures that tasks are executed exactly once.

Parallelizing work with Restate

Write flexible scheduling logic via durable building blocks.

Restate makes it easy to parallelize async work by fanning out tasks. Afterwards, you can collect the result by fanning in the partial results. Durable Execution ensures that the fan-out and fan-in steps happen reliably exactly once.

fan_out_worker.ts

const workerService = restate.service({
name: "worker",
handlers: {
run: async (ctx: Context, task: Task) => {
// Split the task in subtasks
const subtasks: SubTask[] = await ctx.run("split task", () =>
split(task)
);
const resultPromises = [];
for (const subtask of subtasks) {
const subResultPromise = ctx
.serviceClient(workerService)
.runSubtask(subtask);
resultPromises.push(subResultPromise);
}
const results = await CombineablePromise.all(resultPromises);
return aggregate(results);
},
runSubtask: async (ctx: Context, subtask: SubTask) => {
// Processing logic goes here ...
// Can be moved to a separate service to scale independently
},
},
});
export const handler = restate.endpoint().bind(workerService).handler();

Fan out

Fan out tasks by calling the subtask handler for each subtask. Every handler is an asynchronous task, for which Restate serves as the queue.

The subtasks might run in different processes, if this is deployed in a parallel setup.

fan_out_worker.ts

const workerService = restate.service({
name: "worker",
handlers: {
run: async (ctx: Context, task: Task) => {
// Split the task in subtasks
const subtasks: SubTask[] = await ctx.run("split task", () =>
split(task)
);
const resultPromises = [];
for (const subtask of subtasks) {
const subResultPromise = ctx
.serviceClient(workerService)
.runSubtask(subtask);
resultPromises.push(subResultPromise);
}
const results = await CombineablePromise.all(resultPromises);
return aggregate(results);
},
runSubtask: async (ctx: Context, subtask: SubTask) => {
// Processing logic goes here ...
// Can be moved to a separate service to scale independently
},
},
});
export const handler = restate.endpoint().bind(workerService).handler();

Fan in

Invocations produce durable promises that can be awaited and combined. Fan in by simply awaiting the combined promise. Invocation promises recover from failures, re-connect to running subtasks.

fan_out_worker.ts

const workerService = restate.service({
name: "worker",
handlers: {
run: async (ctx: Context, task: Task) => {
// Split the task in subtasks
const subtasks: SubTask[] = await ctx.run("split task", () =>
split(task)
);
const resultPromises = [];
for (const subtask of subtasks) {
const subResultPromise = ctx
.serviceClient(workerService)
.runSubtask(subtask);
resultPromises.push(subResultPromise);
}
const results = await CombineablePromise.all(resultPromises);
return aggregate(results);
},
runSubtask: async (ctx: Context, subtask: SubTask) => {
// Processing logic goes here ...
// Can be moved to a separate service to scale independently
},
},
});
export const handler = restate.endpoint().bind(workerService).handler();

Server(less)

Deploy this service on an platform like Kubernetes or AWS Lambda to automatically get parallel scale out.

fan_out_worker.ts

const workerService = restate.service({
name: "worker",
handlers: {
run: async (ctx: Context, task: Task) => {
// Split the task in subtasks
const subtasks: SubTask[] = await ctx.run("split task", () =>
split(task)
);
const resultPromises = [];
for (const subtask of subtasks) {
const subResultPromise = ctx
.serviceClient(workerService)
.runSubtask(subtask);
resultPromises.push(subResultPromise);
}
const results = await CombineablePromise.all(resultPromises);
return aggregate(results);
},
runSubtask: async (ctx: Context, subtask: SubTask) => {
// Processing logic goes here ...
// Can be moved to a separate service to scale independently
},
},
});
export const handler = restate.endpoint().bind(workerService).handler();

Restate makes it easy to parallelize async work by fanning out tasks. Afterwards, you can collect the result by fanning in the partial results. Durable Execution ensures that the fan-out and fan-in steps happen reliably exactly once.

Fan out

Fan out tasks by calling the subtask handler for each subtask. Every handler is an asynchronous task, for which Restate serves as the queue.

The subtasks might run in different processes, if this is deployed in a parallel setup.

Fan in

Invocations produce durable promises that can be awaited and combined. Fan in by simply awaiting the combined promise. Invocation promises recover from failures, re-connect to running subtasks.

Server(less)

Deploy this service on an platform like Kubernetes or AWS Lambda to automatically get parallel scale out.

fan_out_worker.ts

const workerService = restate.service({
name: "worker",
handlers: {
run: async (ctx: Context, task: Task) => {
// Split the task in subtasks
const subtasks: SubTask[] = await ctx.run("split task", () =>
split(task)
);
const resultPromises = [];
for (const subtask of subtasks) {
const subResultPromise = ctx
.serviceClient(workerService)
.runSubtask(subtask);
resultPromises.push(subResultPromise);
}
const results = await CombineablePromise.all(resultPromises);
return aggregate(results);
},
runSubtask: async (ctx: Context, subtask: SubTask) => {
// Processing logic goes here ...
// Can be moved to a separate service to scale independently
},
},
});
export const handler = restate.endpoint().bind(workerService).handler();

Restate as sophisticated task queue

Restate is built as an event-driven foundation, and therefore supports task queues by design.
Async tasks run like any other function in your infrastructure: on K8S, FaaS, or mix-and-match.
No need to spin up extra infrastructure or message queues.

Restate as sophisticated task queue

Switch between async and sync with Restate

Imagine a data preparation workflow that creates an S3 bucket, uploads a file to it, and then returns the URL.

Let's now kick off this workflow from another process.

data_preparation_service.ts
client.ts

const dataPreparationService = restate.workflow({
name: "dataPrep",
handlers: {
run: async (ctx: WorkflowContext, args: { userId: string }) => {
const url = await ctx.run("create S3 bucket", () => createS3Bucket());
await ctx.run("upload data", () => uploadData(url));
await ctx.promise<URL>("url").resolve(url);
return url;
},
resultAsEmail: async (
ctx: WorkflowSharedContext,
req: { email: string }
) => {
const url = await ctx.promise<URL>("url");
await ctx.run("send email", () => sendEmail(url, req.email));
},
},
});
export type DataPrepService = typeof dataPreparationService;

  1. Connect to the Restate server and create a client for the data preparation workflow.
data_preparation_service.ts
client.ts

const rs = restate.connect({ url: RESTATE_URL });
const dataPrepService: DataPrepService = { name: "dataPrep" };
async function downloadData(userId: string) {
const dataPrep = rs.workflowClient(dataPrepService, userId);
await dataPrep.workflowSubmit({ userId });
const result = await withTimeout(dataPrep.workflowAttach(), 30_000);
if (result === Timeout) {
const email = await readLine("This takes long... Mail us the link later");
await dataPrep.resultAsEmail({ email });
return;
}
// ... process directly ...
}

  1. Kick off a new data preparation workflow. This is idempotent per workflow ID.
data_preparation_service.ts
client.ts

const rs = restate.connect({ url: RESTATE_URL });
const dataPrepService: DataPrepService = { name: "dataPrep" };
async function downloadData(userId: string) {
const dataPrep = rs.workflowClient(dataPrepService, userId);
await dataPrep.workflowSubmit({ userId });
const result = await withTimeout(dataPrep.workflowAttach(), 30_000);
if (result === Timeout) {
const email = await readLine("This takes long... Mail us the link later");
await dataPrep.resultAsEmail({ email });
return;
}
// ... process directly ...
}

  1. Wait for the result for 30 seconds.
data_preparation_service.ts
client.ts

const rs = restate.connect({ url: RESTATE_URL });
const dataPrepService: DataPrepService = { name: "dataPrep" };
async function downloadData(userId: string) {
const dataPrep = rs.workflowClient(dataPrepService, userId);
await dataPrep.workflowSubmit({ userId });
const result = await withTimeout(dataPrep.workflowAttach(), 30_000);
if (result === Timeout) {
const email = await readLine("This takes long... Mail us the link later");
await dataPrep.resultAsEmail({ email });
return;
}
// ... process directly ...
}

  1. If it takes longer, rewire the workflow to send an email instead. If returns within 30 seconds, process the URL directly.
data_preparation_service.ts
client.ts

const rs = restate.connect({ url: RESTATE_URL });
const dataPrepService: DataPrepService = { name: "dataPrep" };
async function downloadData(userId: string) {
const dataPrep = rs.workflowClient(dataPrepService, userId);
await dataPrep.workflowSubmit({ userId });
const result = await withTimeout(dataPrep.workflowAttach(), 30_000);
if (result === Timeout) {
const email = await readLine("This takes long... Mail us the link later");
await dataPrep.resultAsEmail({ email });
return;
}
// ... process directly ...
}

  1. This is implemented in the data preparation workflow by letting the workflow signal our handler when it's done. It does this by resolving a shared Durable Promise that we then retrieve in our handler to send the email.
data_preparation_service.ts
client.ts

const dataPreparationService = restate.workflow({
name: "dataPrep",
handlers: {
run: async (ctx: WorkflowContext, args: { userId: string }) => {
const url = await ctx.run("create S3 bucket", () => createS3Bucket());
await ctx.run("upload data", () => uploadData(url));
await ctx.promise<URL>("url").resolve(url);
return url;
},
resultAsEmail: async (
ctx: WorkflowSharedContext,
req: { email: string }
) => {
const url = await ctx.promise<URL>("url");
await ctx.run("send email", () => sendEmail(url, req.email));
},
},
});
export type DataPrepService = typeof dataPreparationService;

Imagine a data preparation workflow that creates an S3 bucket, uploads a file to it, and then returns the URL.

Let's now kick off this workflow from another process.

  1. Connect to the Restate server and create a client for the data preparation workflow.
  1. Kick off a new data preparation workflow. This is idempotent per workflow ID.
  1. Wait for the result for 30 seconds.
  1. If it takes longer, rewire the workflow to send an email instead. If returns within 30 seconds, process the URL directly.
  1. This is implemented in the data preparation workflow by letting the workflow signal our handler when it's done. It does this by resolving a shared Durable Promise that we then retrieve in our handler to send the email.
data_preparation_service.ts
client.ts

const dataPreparationService = restate.workflow({
name: "dataPrep",
handlers: {
run: async (ctx: WorkflowContext, args: { userId: string }) => {
const url = await ctx.run("create S3 bucket", () => createS3Bucket());
await ctx.run("upload data", () => uploadData(url));
await ctx.promise<URL>("url").resolve(url);
return url;
},
resultAsEmail: async (
ctx: WorkflowSharedContext,
req: { email: string }
) => {
const url = await ctx.promise<URL>("url");
await ctx.run("send email", () => sendEmail(url, req.email));
},
},
});
export type DataPrepService = typeof dataPreparationService;

What you can build with Async Tasks and Restate

Payments: Combining sync & async responses

Issue an idempotent payment to Stripe and process the response. The payment provider either responds immediately or notifies us later via a webhook.

Job scheduler

A job scheduler that can handle both immediate and delayed jobs. The scheduler makes sure job submissions are idempotent and that jobs run to completion exactly once. You can attach back to jobs to retrieve their result later.

Building Durable Promises on top of Restate

Promises that get durably stored in Restate, and can be resolved by any process. Implemented as a Restate service, to use for callbacks, signal and communicate between systems. The SDK offers similar constructs with Awakeables and Durable Promises.

Wondering about a specific use case?

Let’s brainstorm together on Discord

Developer Resources

Docs

Read the docs to learn more.

Need help?

Join the Restate Discord channel