Feature-Data-Service¶
What does feature data service do?¶
Feature data service is a job which read events from kinesis stream, extract variables from event, and publish message in rabbit mq for Feature read service to update feature database (now in mongodb, soon will be in sql server)
risk-event-processing-config.json¶
In risk-configuration repo, there is a configuration file "risk-event-processing-config.json". This file guides how FeatureDataService filter, extract and transform variable from event. However, we use DBT model to generate this file now. So we do not update this file manually, unless you want to fix some urgent thing on prod. But keep in mind that any manual change in this file will be overwrote later by DBT model.
Here is a snippet
{
"Events": {
"beneficiaryId/RemittanceTransactionSubmitEvent/sentAmount": {
"Sources": [
{
"AggregatePoint": "BeneficiaryId",
"SourceEvent": "RemittanceTransactionSubmitEvent",
"Types": [
"record"
],
"VariableName": "SentAmount"
},
{
"AggregatePoint": "BeneficiaryId",
"SourceEvent": "RemittanceTransactionSubmitEvent",
"Types": [
"stats"
],
"VariableName": "SentAmount"
}
]
}
}
}
Events is a dictionary. Its item key give information about how the variable is organized in risk database. Its item value is an object which specify where does the information come from.
Events Key¶
- aggregationPoint is kind of identity to an object in real world. For example, a userId, a beneficiaryId.
- eventAliasName is kind of category of the variables. It would be better to call it variable category. But since event alias name has been used since beginning, we still use it.
- variableName is the variable name used in risk database.
Events Value¶
The value part contains a Sources property which is a list of sources. Here explain properties on the source object
- AggregatePoint: specify the source field to get a value for aggregation point
- SourceEvent: specify the event name of the source event
- Types: specify how data is stored in risk. It is a list of types actually. It is how risk system supports. But we used DBT model to generate this configuration file now. DBT parser has limits to output multiple types. So you will see it is always single value in the array.
- Variable: specify from which field should feature data service extract value
- transactionKeyFields: a part of transaction trace. It tells which field in the source event contains the transactionId. It is named as Fields, because you can put multiple field names, separated by comma
- transactionStatus: a part of transaction trace. To give transaction status directly
- transactionStatusField: a part of transaction status. To fetch transaction status from source event field. Not used now due to limitation in Dbt model parser. But risk system support it still.
- transactionStatusMap: a part of transaction status. Not used now due to limitation of DBT model parser. But risk system support it still
- Idempotent: a part of transaction trace. It tells should risk accept duplicate request. risk detect duplication by combining transactionId and transactionStatus
Transaction Trace¶
Transaction trace is a function to trace related activities, such as a transaction. This function was created to
- skip duplicated event because of application retry, or risk has to get value from multiple events but they overlaps
- get correct velocity value when tranaction starts and ends in different dates. For example, a user send $100 dollar yesterday, but is rejected by provider today. Without transaction trace, we will see the user's last1d amount become -$100. transaction trace can link this Rollback record to its _Initialize__ record to get correct value.
Kinesis Shards¶
Now we have two shards in kinesis to speed up event processing. When DW push event to kinesis, events are partitioned by userId property to decide which shard should it go to.
When Feature data service starts, it query kinesis to get number of shard and then start a thread for each shard. we have two threads because we have two shards. We have health check code to ensure that if any thread crash, the feature data service will restart.
Since, it query kinesis for shard information only when it starts. After change shard number, we need to restart the service for it to get correct shard number. According to the mechanism how kinesis change shard number, our services should crash and restart automatically to get correct shard number. But we have not test this yet.