blog

Implementing Cross-entity Unique Keys in Kalix - Part 3

Renato Cavalcanti.
Principal Engineer, Lightbend.
  • 11 October 2022,
  • 8 minute read

Introduction

In part two we explored how we could create unique cross-entity keys by using two entities. A Value Entity for the User handle and an Event Sourced Entity for the User account.

We explained that when combining the calls to two entities, the calls are not executed in an atomic fashion and therefore there is a small chance that the call to the Handle entity succeeds, but the call to the User entity fails for some reason. In such a scenario, the requested handle gets in a locked state. It got reserved, but is not in use by the entity. In this new post we will explain how we can use Kalix Timers to free it again.

NOTE

The full project code can be found here.

But before we cover the failure case, let’s first consider the successful flow of operations. When modeling the Handle entity, we added two fields. A handle field marked as a primary key and a confirmed boolean flag. The handle is confirmed when we can prove that it’s effectively in use. The best way of doing it is to listen to events from the User entity. We will do that by defining a new Action.

We will call use the UserWorkflow Action and define it in src/main/proto/com/example/workflow/workflow_user_action.proto

syntax = "proto3";

import "google/protobuf/empty.proto";
import "google/protobuf/any.proto";
import "kalix/annotations.proto";
import "com/example/user/domain/user_domain.proto";

package com.example.workflow;

option java_outer_classname = "WorkflowUserActionApi";

service UserWorkflowAction {
option (kalix.codegen) = {
  action: {
    name: "UserWorkflow"
  }
};

option (kalix.service).acl.allow = { principal: ALL };

rpc OnUserCreated(com.example.user.domain.UserCreated) returns (google.protobuf.Empty) {
  option (kalix.method).eventing.in = {
    event_sourced_entity: "user"
  };
}
}

Note that it imports the user_domain.proto file and has a method subscribing to events from the user entity.

The Java implementation for this method will be:

@Override
public Effect<Empty> onUserCreated(UserDomain.UserCreated userCreated) {
HandleApi.Handle confirmHandle = 
  HandleApi.Handle.newBuilder()
    .setHandleId(userCreated.getHandle())
    .build()
return effects().forward(components().handleEntity().confirm(confirmHandle));
}

The implementation receives the UserCreated event and then calls the HandleEntity to confirm it. As shown in the previous blog, the confirmation method sets the boolean to true.

If for some reason the call to the Handle entity fails, the UserCreated event will be once again delivered to that same method. This is guaranteed to happen. When we listen to events from an event sourced entity it is guaranteed to be delivered, although within a small interval. The method that handles the event must succeed. If it fails for some reason, the event is delivered once again. In other words, listening to events has at-least-once semantics.

Now let’s explore our options in case of failures. We said that a Handle could be reserved, but the User entity creation could fail leaving the handle in a locked state (reserved, but not in use).

The first easy solution for that is to say, if they get locked because of such a failure, we leave it as is. Practically, we “burn” the handle. This is of course an easy and lazy solution, but we can do better than that.

Like with the UserWorkflow Action, we can create HandleWorkflow Action. However, we will need to add a time dimension to it.

First, let’s define its gRPC definition in file src/main/proto/com/example/workflow/workflow_handle_action.proto

syntax = "proto3";

import "google/protobuf/empty.proto";
import "google/protobuf/any.proto";
import "kalix/annotations.proto";
import "com/example/handle/domain/handle_domain.proto";

package com.example.workflow;

option java_outer_classname = "WorkflowHandleActionApi";

service HandleWorkflowAction {
option (kalix.codegen) = {
  action: {
    name: "HandleWorkflow"
  }
};

option (kalix.service).acl.allow = { principal: ALL };

rpc OnHandleChange(com.example.handle.domain.HandleState) returns (google.protobuf.Empty) {
  option (kalix.method).eventing.in = {
    value_entity: "handle"
  };
}
}

This Action is very similar to the UserWorkflow one. Except that it listens to value changes from the Handle Value Entity. Listening to value changes is a little different than listening to events. When listening to event sourced entities events, we receive each occurred event in order. When listening to value entities changes, we receive the state of the value en

Another aspect to take into consideration is that we don’t necessarily receive every single value change. When Kalix fetches data to deliver to your function, it’s possible that some intermediary states are not delivered. For example, if our value entity is a counter and you listen to its state changes, you may see the counter increasing incrementaly 1, 2, 3, 4, etc or you may only see 1, 3, 4, etc.

That also holds for our Handle Entity. There are only two possible states for it: it’s either created but unconfirmed, or it is created and confirmed. That said, our implementation for the OnHandleChange method will take that into account.

When we see the state as confirmed, we know that the User entity was correctly created, the UserWorkflow received the UserCreated event and changed the Handle from unconfirmed to confirmed.

On the other hand, when we see the state as unconfirmed, we can’t be sure if the User entity was created but the UserWorkflow didn’t get the chance to confirm it yet or if the User entity was never registered at all and the the handle is in a locked state. Therefore we need to add a time dimension to our workflow.

Before we move on, we will extend the Handle entity with one extra method. In addition to Reserve and Confirm, we will add an Expire method.

syntax = "proto3";

import "google/protobuf/empty.proto";
import "kalix/annotations.proto";

package com.example.handle;

option java_outer_classname = "HandleApi";

message Handle {
string handle_id = 1 [(kalix.field).entity_key = true];
}

service HandleService {
option (kalix.codegen) = {
  value_entity: {
    name: "com.example.handle.domain.HandleEntity"
    entity_type: "handle"
    state: "com.example.handle.domain.HandleState"
  }
};
option (kalix.service).acl.allow = { principal: ALL };

rpc Reserve(Handle) returns (google.protobuf.Empty);
rpc Confirm(Handle) returns (google.protobuf.Empty);
rpc Expire(Handle) returns (google.protobuf.Empty);
}

Its implementation will be:

@Override
public Effect<Empty> expire(HandleDomain.HandleState currentState, HandleApi.Handle handle) {
// do nothing if already confirmed
if (currentState.getConfirmed()) {
  return effects().reply(Empty.getDefaultInstance());
} else {
  return delete(currentState, handle);
}
}

When we call the expire method, we check if the handle was already confirmed, if that’s the case, we leave it untouched. However, if not confirmed, we will delete the handle.

Back to the HandleWorkflow, we will use a Kalix Timer to trigger the expiration.

@Override
public Effect<Empty> onHandleChange(HandleDomain.HandleState handleState) {

// it's not guaranteed to see all changes.
// If we see a confirmed handle, we don't need to do anything
// If we see an unconfirmed handle, we can schedule a timer to expire it
if (handleState.getConfirmed()) {
  return effects().reply(Empty.getDefaultInstance());
} else {
  CompletionStage<Empty> expirationTimer =
    timers()
      .startSingleTimer(
        "expire-timer-" + handleState.getHandle(),
        Duration.of(30, ChronoUnit.MINUTES),
        components().handleEntity().expire(HandleApi.Handle.newBuilder().setHandleId(handleState.getHandle()).build())
      )
      .thenApply(__ -> Empty.getDefaultInstance());

  return effects().asyncReply(expirationTimer);
}
}

In HandleWorkflow, the on change method will first check if the handle has been confirmed in the meantime. If so, we do nothing. However, if unconfirmed, we will schedule a call to expire it. In our example, we are scheduling it to run in 30 minutes, but could be any time in the future.

When the call is triggered, it will delete the handle if not already confirmed.

Could we make it even more robust? One could argue that 30 minutes is not enough. We could imagine that the confirmation gets blocked because of some silly bug in the UserWorkflow, causing it to never confirm any handle. If that’s the case, the handle confirmation could be hanging for a long time until a human operator realizes that there is a bug. In the meantime, while handle confirmation is not passing through, the HandleWorkflow could potentially start to delete handles that are effectively in use.

One extra level of security that we could implement is to add the userId to the Handle. The Handle would then have the handle field as primary key and the userId as a reference to the originating User account. The HandleWorkflow could then check if the entity exists before even scheduling an expiration call. This would prevent the expiration/deletion of any handle that is linked to a User account.

If we go back to our original User entity, it also had an email address. The same technique can be applied to guarantee email address uniqueness across all User entities.

We’ll finalize this series with an exercise suggestion. What will take us to extend the functionality to allow a user to change their handle? Which methods and logic can be reused to implement the use case of changing a user’s handle?

We hope you enjoyed the series.

Author Section will go here