Versioning - Python SDK
The definition code of a Temporal Workflow must be deterministic because Temporal uses event sourcing to reconstruct the Workflow state by replaying the saved history event data on the Workflow definition code. This means that any incompatible update to the Workflow Definition code could cause a non-deterministic issue if not handled correctly.
Introduction to Versioning
Because we design for potentially long running Workflows at scale, versioning with Temporal works differently. We explain more in this optional 30 minute introduction: https://www.youtube.com/watch?v=kkP899WxgzY
How to use the Python SDK Patching API
In principle, the Python SDK's patching mechanism operates similarly to other SDKs in a "feature-flag" fashion. However, the "versioning" API now uses the concept of "patching in" code.
To understand this, you can break it down into three steps, which reflect three stages of migration:
- Running
pre_patch_activity
code while concurrently patching inpost_patch_activity
. - Running
post_patch_activity
code with deprecation markers formy-patch
patches. - Running only the
post_patch_activity
code.
Let's walk through this process in sequence.
Suppose you have an initial Workflow version called pre_patch_activity
:
View the source code
in the context of the rest of the application code.
from datetime import timedelta
from temporalio import workflow
with workflow.unsafe.imports_passed_through():
from activities import pre_patch_activity
# ...
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self) -> None:
self._result = await workflow.execute_activity(
pre_patch_activity,
schedule_to_close_timeout=timedelta(minutes=5),
)
Now, you want to update your code to run post_patch_activity
instead. This represents your desired end state.
View the source code
in the context of the rest of the application code.
from datetime import timedelta
from temporalio import workflow
with workflow.unsafe.imports_passed_through():
from activities import post_patch_activity
# ...
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self) -> None:
self._result = await workflow.execute_activity(
post_patch_activity,
schedule_to_close_timeout=timedelta(minutes=5),
)
Problem: You cannot deploy post_patch_activity
directly until you're certain there are no more running Workflows created using the pre_patch_activity
code, otherwise you are likely to cause a nondeterminism error.
Instead, you'll need to deploy post_patched_activity
and use the patched function to determine which version of the code to execute.
Implementing patching involves three steps:
- Use patched to patch in new code and run it alongside the old code.
- Remove the old code and apply deprecate_patch.
- Once you're confident that all old Workflows have finished executing, remove
deprecate_patch
.
Overview
The following sample shows how the patched()
function behaves, providing explanations at each stage of the patching flow:
if patched('v3'):
# This is the newest version of the code.
# The above patched statement following will do
# one of the following three things:
# 1. If the execution is not Replaying, it will evaluate
# to true and write a Marker Event to the history
# with a patch id v3. This code block will run.
# 2. If the execution is Replaying, and the original
# run put a Patch ID v3 at this location in the event
# history, it will evaluate to True, and this code block
# will run.
# 3. If the execution is Replaying, and the original
# run has a Patch ID other than v3 at this location in the event
# history, it will evaluate to False, and this code block won't
# run.
pass
elif patched('v2'):
# This is the second version of the code.
# The above patched statement following will do
# one of the following three things:
# 1. If the execution is not Replaying, the execution
# won't get here because the first patched statement
# will be True.
# 2. If the execution is Replaying, and the original
# run put a Patch ID v2 marker at this location in the event
# history, it will evaluate to True, and this code block
# will run.
# 3. If the execution is Replaying, and the original
# run has a Patch ID other than v2 at this location in the event
# history, or doesn't have a patch marker at this location in the event
# history, it will evaluate to False, and this code block won't
# run.
pass
else:
# This is the original version of the code.
# The above patched statement following will do
# one of the following three things:
# 1. If the execution is not Replaying, the execution
# won't get here because the first patched statement
# will be True.
# 2. If the execution is Replaying, and the original
# run had a patch marker v3 or v2 at this location in the event
# history, the execution
# won't get here because the first or second patched statement
# will be True (respectively).
# 3. If the execution is Replaying, and condition 2
# doesn't hold, then it will run this code.
pass
To add more clarity, the following sample shows how patched()
will behave in a different conditional block.
In this case, the code's conditional block doesn't have the newest code at the top.
Because patched()
will always return True
when not Replaying, this snippet will run the v2
branch instead of v3
in new executions.
if patched('v2'):
# This is bad because when doing an original execution (i.e. not replaying),
# all patched statements evaluate to True (and put a marker
# in the event history), which means that new executions
# will use v2, and miss v3 below
pass
elif patched('v3'):
pass
else:
pass
The moral of the story is that when not Replaying, patched()
will return True
and write the patch ID to the Event History.
And when Replaying, it will only return true if the patch ID matches that in the Event History.
Patching in new code
Using patched()
inserts a marker into the Workflow History.
During replay, if a Worker encounters a history with that marker, it will fail the Workflow task when the Workflow code doesn't produce the same patch marker (in this case, my-patch
). This ensures you can safely deploy code from post_patch_activity
as a "feature flag" alongside the original version (pre_patch_activity
).
View the source code
in the context of the rest of the application code.
# ...
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self) -> None:
if workflow.patched("my-patch"):
self._result = await workflow.execute_activity(
post_patch_activity,
schedule_to_close_timeout=timedelta(minutes=5),
)
else:
self._result = await workflow.execute_activity(
pre_patch_activity,
schedule_to_close_timeout=timedelta(minutes=5),
)
Understanding deprecated Patches in the Python SDK
After ensuring that all Workflows started with pre_patch_activity
code have finished, you can deprecate the patch.
Once you're confident that your Workflows are no longer running the pre-patch code paths, you can deploy your code with deprecate_patch()
.
These Workers will be running the most up-to-date version of the Workflow code, which no longer requires the patch.
The deprecate_patch()
function works similarly to the patched()
function by recording a marker in the Workflow history.
This marker does not fail replay when Workflow code does not emit it.
Deprecated patches serve as a bridge between the pre-patch code paths and the post-patch code paths, and are useful for avoiding errors resulting from patched code paths in your Workflow history.
View the source code
in the context of the rest of the application code.
# ...
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self) -> None:
workflow.deprecate_patch("my-patch")
self._result = await workflow.execute_activity(
post_patch_activity,
schedule_to_close_timeout=timedelta(minutes=5),
)
Safe Deployment of post_patch_activity
Once you're sure that you will no longer need to Query or Replay any of your pre-patch Workflows, you can then safely deploy Workers that no longer use either the patched()
or deprecate_patch()
calls:
View the source code
in the context of the rest of the application code.
# ...
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self) -> None:
self._result = await workflow.execute_activity(
post_patch_activity,
schedule_to_close_timeout=timedelta(minutes=5),
)
Best Practice of Using Python Dataclasses as Arguments and Returns
As a side note on the Patching API, its behavior is why Temporal recommends using single dataclasses as arguments and returns from Signals, Queries, Updates, and Activities, rather than using multiple arguments.
The Patching API's main use case is to support branching in an if
block of a method body.
It is not designed to be used to set different methods or method signatures for different Workflow Versions.
Because of this, Temporal recommends that each Signal, Activity, etc, accepts a single dataclass and returns a single dataclass, so the method signature can stay constant, and you can do your versioning logic using patched()
within the method body.
How to use Worker Versioning in Python
Worker Versioning is currently in Pre-release.
See the Pre-release README for more information.
A Build ID corresponds to a deployment. If you don't already have one, we recommend a hash of the code--such as a Git SHA--combined with a human-readable timestamp. To use Worker Versioning, you need to pass a Build ID to your Java Worker and opt in to Worker Versioning.
Assign a Build ID to your Worker and opt in to Worker Versioning
You should understand assignment rules before completing this step. See the Worker Versioning Pre-release README for more information.
To enable Worker Versioning for your Worker, assign the Build ID--perhaps from an environment variable--and turn it on.
# ...
worker = Worker(
task_queue="your_task_queue_name",
build_id=build_id,
use_worker_versioning=True,
# ... register workflows & activities, etc
)
# ...
Importantly, when you start this Worker, it won't receive any tasks until you set up assignment rules.
Specify versions for Activities, Child Workflows, and Continue-as-New Workflows
Python support for this feature is under construction!
By default, Activities, Child Workflows, and Continue-as-New Workflows are run on the build of the workflow that created them if they are also configured to run on the same Task Queue. When configured to run on a separate Task Queue, they will default to using the current assignment rules.
If you want to override this behavior, you can specify your intent via the versioning_intent
argument available on the methods you use to invoke these commands.
For example, if you want an Activity to use the latest assignment rules rather than inheriting from its parent:
# ...
await workflow.execute_activity(
say_hello,
"hi",
versioning_intent=VersioningIntent.USE_ASSIGNMENT_RULES,
start_to_close_timeout=timedelta(seconds=5),
)
# ...
Tell the Task Queue about your Worker's Build ID (Deprecated)
This section is for a previous Worker Versioning API that is deprecated and will go away at some point. Please redirect your attention to Worker Versioning.
Now you can use the SDK (or the Temporal CLI) to tell the Task Queue about your Worker's Build ID. You might want to do this as part of your CI deployment process.
# ...
await client.update_worker_build_id_compatibility(
"your_task_queue_name", BuildIdOpAddNewDefault("deadbeef")
)
This code adds the deadbeef
Build ID to the Task Queue as the sole version in a new version set, which becomes the default for the queue.
New Workflows execute on Workers with this Build ID, and existing ones will continue to process by appropriately compatible Workers.
If, instead, you want to add the Build ID to an existing compatible set, you can do this:
# ...
await client.update_worker_build_id_compatibility(
"your_task_queue_name", BuildIdOpAddNewCompatible("deadbeef", "some-existing-build-id")
)
This code adds deadbeef
to the existing compatible set containing some-existing-build-id
and marks it as the new default Build ID for that set.
You can also promote an existing Build ID in a set to be the default for that set:
# ...
await client.update_worker_build_id_compatibility(
"your_task_queue_name", BuildIdOpPromoteBuildIdWithinSet("deadbeef")
)
You can also promote an entire set to become the default set for the queue. New Workflows will start using that set's default build.
# ...
await client.update_worker_build_id_compatibility(
"your_task_queue_name", BuildIdOpPromoteSetByBuildId("deadbeef")
)