Creating a Step
What is a step?
A step is a unit of processing logic in the SDK and can be used to define logic for the extraction, transformation, or storing of data. Steps are the building blocks of a processor. The Aptos core processors represent (1) getting a stream of transactions from Transaction Stream, (2) extracting the data, (3) writing to a database, and (4) tracking the progress, each as separate steps.
There are two types of steps in the SDK:
- AsyncStep: Processes a batch of input items and returns a batch of output items.
- PollableAsyncStep: Does the same as
AsyncStep
, but it also periodically polls its internal state and returns a batch of output items if available.
How to create a Step
To create a step with the SDK, follow these instructions:
-
Implement the
Processable
trait. This trait defines several important details about the step: the input and output types, the processing logic, and the run type (eitherAsyncStepRunType
orPollableAsyncStepRunType
).#[async_trait] impl Processable for MyExtractorStep { // The Input is a batch of Transaction type Input = Transaction; // The Output is a batch of MyData type Output = MyData; // Depending on the type of step this is, the RunType is either // - AsyncRunType // - PollableAsyncRunType type RunType = AsyncRunType; // Processes a batch of input items and returns a batch of output items. async fn process( &mut self, input: TransactionContext<Transaction>, ) -> Result<Option<TransactionContext<MyData>>, ProcessorError> { let transactions = input.data; let data = transactions.iter().map(|transaction| { // Define the processing logic to extract MyData from a Transaction }).collect(); Ok(Some(TransactionContext { data, metadata: input.metadata, })) } }
In the example code above, you’ll notice that the input and output types are wrapped within a
TransactionContext
.TransactionContext
contains relevant metadata about the batch of data being processed, such as the transaction versions and timestamp, and are used for metrics and logging. -
Implement the
NamedStep
trait. This is used for logging.impl NamedStep for MyExtractorStep { fn name(&self) -> String { "MyExtractorStep".to_string() } }
-
Implement either
AsyncStep
trait orPollableAsyncStep
trait, which defines how the step will be run in the processor.-
If you’re using
AsyncStep
, add this to your code:impl AsyncStep for MyExtractorStep {}
-
If you’re creating a
PollableAsyncStep
, you will need to define the poll interval and what the step should do every time it polls.#[async_trait] impl<T: Send + 'static> PollableAsyncStep for MyPollStep<T> where Self: Sized + Send + Sync + 'static, T: Send + 'static, { fn poll_interval(&self) -> std::time::Duration { // Define duration } async fn poll(&mut self) -> Result<Option<Vec<TransactionContext<T>>>, ProcessorError> { // Define code here on what this step should do every time it polls // Optionally return a batch of output items } }
-
Common steps
The SDK provides several common steps that you can use in your processor.
TransactionStreamStep
provides a stream of Aptos transactions to the processorTimedBufferStep
buffers a batch of items and periodically polls to release the items to the next step