Unified Price Enrichment Pipeline: Normalize, Derive, Fetch
Hey guys! Today, we're diving deep into Phase 3 of our project to add FX rate tracking for multi-currency support. This is all about creating a unified prices enrich
command that streamlines how we handle prices. We're talking normalizing, deriving, and fetching β all in one smooth pipeline. Let's break it down!
Overview
The goal is to create a unified prices enrich
command with three sequential stages:
- Normalize: Convert non-USD fiat prices to USD using FX providers.
- Derive: Extract prices from USD trades (actual USD only, not stablecoins).
- Fetch: Fetch missing crypto/stablecoin prices from external providers.
Key Architectural Constraints
Critical: USD-Only in priceAtTxTime.price
It's super important that priceAtTxTime.price.currency
MUST be USD after enrichment. We never want to store EUR, CAD, GBP, or any other non-USD currency in this field. Original currency conversions are tracked via FX metadata (fxRateToUSD
, fxSource
, fxTimestamp
).
Before enrichment:
priceAtTxTime
is undefined OR contains price in original currency (EUR, CAD, etc.)
After enrichment:
priceAtTxTime.price.currency
MUST be USD- Never store EUR, CAD, GBP, or any non-USD currency in this field
- Original currency conversions tracked via FX metadata (
fxRateToUSD
,fxSource
,fxTimestamp
)
Stablecoin Handling (IMPORTANT)
Do NOT assume 1:1 USD peg for stablecoins (USDC, USDT, DAI, etc.).
- Stage 2 (Derive) should ONLY work with actual USD.
- Stablecoins are treated as crypto assets and fetched in Stage 3.
- Rationale: Historical de-peg events (UST collapse, USDC brief de-peg to $0.98).
- We need actual historical prices, not assumptions.
Implementation Tasks
1. Update Core Domain Model
File: packages/core/src/types/currency.ts
No changes needed β do NOT add isUsdEquivalent()
method. Keep it simple, guys!
2. Update Price Calculation Utils
File: packages/accounting/src/price-enrichment/price-calculation-utils.ts
Update calculatePriceFromTrade()
:
// BEFORE (current)
if (inflowCurrency.isFiatOrStablecoin()) { ... } // TRUE for USD, EUR, CAD, USDC
if (outflowCurrency.isFiatOrStablecoin()) { ... }
// AFTER (Phase 3)
const inflowIsUSD = inflowCurrency.toString() === 'USD';
const outflowIsUSD = outflowCurrency.toString() === 'USD';
// Only derive when exactly ONE side is actual USD
// Skip: EUR trades (normalized in Stage 1)
// Skip: USDC trades (fetched in Stage 3 with actual historical prices)
if (outflowIsUSD && !inflowIsUSD) { ... } // Buy crypto with USD
if (inflowIsUSD && !outflowIsUSD) { ... } // Sell crypto for USD
3. Create Price Normalization Service
File: packages/accounting/src/price-enrichment/price-normalization-service.ts
This service is crucial for ensuring all prices are in USD. The PriceNormalizationService
class is designed to normalize all non-USD fiat prices to USD, ensuring consistency and accuracy in our financial data. This process involves identifying movements with priceAtTxTime
where the currency is not USD, fetching the appropriate FX rate, converting the price amount to USD, and populating the necessary FX metadata. This service plays a pivotal role in our unified price enrichment pipeline by ensuring all fiat currencies are converted to USD, streamlining subsequent data processing and analysis. The service must handle scenarios where FX rates are unavailable, logging warnings and providing graceful degradation to maintain system stability and data integrity. Consider it the backbone of our multi-currency support, as it guarantees all financial calculations are based on a single, standardized currency, enabling more reliable and consistent reporting and analysis across the board. It's a pretty big deal!
class PriceNormalizationService {
/**
* Normalize all non-USD fiat prices to USD
*
* Process:
* 1. Find movements with priceAtTxTime where currency != 'USD'
* 2. For each non-USD fiat price (EUR, CAD, GBP):
* a. Fetch FX rate via PriceProviderManager (EURβUSD at tx time)
* b. Convert price.amount to USD
* c. Populate fxRateToUSD, fxSource, fxTimestamp metadata
* d. Update price.currency to 'USD'
* 3. Skip crypto prices (they shouldn't exist yet, but log warning)
* 4. Graceful degradation: warn on missing FX rates
*/
async normalize(options?: { interactive?: boolean }): Promise<Result<NormalizeResult, Error>>;
}
Returns:
interface NormalizeResult {
movementsNormalized: number;
movementsSkipped: number; // Already USD
failures: number;
errors: string[];
}
4. Create Unified Enrich Handler
File: apps/cli/src/features/prices/prices-enrich-handler.ts
The PricesEnrichHandler
class is the conductor of our price enrichment orchestra, orchestrating the normalization, derivation, and fetching of prices in a cohesive sequence. This handler determines which stages to execute based on the provided options, ensuring that the correct services are invoked in the proper order. The core of its functionality lies in the execute
method, which receives an options object that dictates which stages of the enrichment pipeline should be run. It then instantiates the necessary services, such as PriceNormalizationService
and PriceEnrichmentService
, and executes their respective methods to process the price data. It plays a critical role in maintaining the overall flow of the price enrichment process, ensuring that each stage is executed efficiently and effectively. Itβs designed to be flexible and adaptable, allowing users to run specific stages independently or execute the entire pipeline in one go. Think of it as the control center for all things price enrichment!
class PricesEnrichHandler {
async execute(options: {
asset?: string[];
interactive?: boolean;
normalizeOnly?: boolean;
deriveOnly?: boolean;
fetchOnly?: boolean;
}): Promise<Result<EnrichResult, Error>> {
const stages = {
normalize: !options.deriveOnly && !options.fetchOnly,
derive: !options.normalizeOnly && !options.fetchOnly,
fetch: !options.normalizeOnly && !options.deriveOnly,
};
// Stage 1: Normalize
if (stages.normalize) {
const normalizeService = new PriceNormalizationService(db, priceManager);
result.normalize = await normalizeService.normalize({ interactive: options.interactive });
}
// Stage 2: Derive
if (stages.derive) {
const enrichmentService = new PriceEnrichmentService(transactionRepo, linkRepo);
result.derive = await enrichmentService.enrichPrices();
}
// Stage 3: Fetch
if (stages.fetch) {
const fetchHandler = new PricesFetchHandler(db);
result.fetch = await fetchHandler.execute({
asset: options.asset,
interactive: options.interactive,
});
}
return ok(result);
}
}
5. Register Enrich Command
File: apps/cli/src/features/prices/prices.ts
Update to register new prices enrich
command:
registerPricesViewCommand(prices);
registerPricesEnrichCommand(prices); // NEW - primary workflow
registerPricesDeriveCommand(prices); // Keep for granular control
registerPricesFetchCommand(prices); // Keep for granular control
File: apps/cli/src/features/prices/prices-enrich.ts
(NEW)
export function registerPricesEnrichCommand(pricesCommand: Command): void {
pricesCommand
.command('enrich')
.description('Enrich prices via normalize β derive β fetch pipeline')
.option('--asset <currency>', 'Filter by asset (e.g., BTC, ETH). Can be specified multiple times.', collect, [])
.option('--interactive', 'Enable interactive mode for manual entry when prices/FX rates unavailable')
.option('--normalize-only', 'Only run normalization stage (FX conversion)')
.option('--derive-only', 'Only run derivation stage (extract from USD trades)')
.option('--fetch-only', 'Only run fetch stage (external providers)')
.option('--json', 'Output results in JSON format')
.action(async (options) => { ... });
}
6. Create Pure Utility Functions
File: apps/cli/src/features/prices/prices-normalize-utils.ts
(NEW)
Pure functions for normalization logic:
extractMovementsNeedingNormalization(tx: UniversalTransaction): AssetMovement[]
validateFxRate(rate: Decimal): Result<void, Error>
createNormalizedPrice(original: Money, fxRate: Decimal, fxSource: string): PriceAtTxTime
7. Tests
New test files:
apps/cli/src/features/prices/__tests__/prices-enrich-handler.test.ts
apps/cli/src/features/prices/__tests__/prices-normalize-utils.test.ts
packages/accounting/src/price-enrichment/__tests__/price-normalization-service.test.ts
Update existing:
packages/accounting/src/price-enrichment/__tests__/price-calculation-utils.test.ts
- Test ONLY USD trades derive prices
- Test EUR trades are skipped (normalized separately)
- Test USDC trades are skipped (fetched with actual prices)
Test scenarios:
- EUR trade β normalize to USD with ECB rate
- CAD trade β normalize to USD with Bank of Canada rate
- USD trade β derive price (no normalization needed)
- USDC trade β skip derive, fetch actual USDC price from provider
- Full pipeline: EUR trade β normalize β derive other assets β fetch remaining
- Graceful degradation: missing FX rate handling
- De-peg scenario: USDC at $0.98 (not $1.00)
Examples
Example 1: EUR Trade Flow
After import/process (before enrich):
{
"movements": {
"inflows": [{
"asset": "BTC",
"amount": "1.0",
"priceAtTxTime": {
"price": { "amount": "40000", "currency": "EUR" },
"source": "exchange-execution",
"fetchedAt": "2023-01-15T10:00:00Z",
"granularity": "exact"
}
}],
"outflows": [{ "asset": "EUR", "amount": "40000" }]
}
}
After Stage 1 (normalize):
{
"movements": {
"inflows": [{
"asset": "BTC",
"amount": "1.0",
"priceAtTxTime": {
"price": { "amount": "43200", "currency": "USD" }, // Converted!
"source": "exchange-execution",
"fetchedAt": "2023-01-15T10:00:00Z",
"granularity": "exact",
"fxRateToUSD": "1.08",
"fxSource": "ecb",
"fxTimestamp": "2023-01-15T10:00:00Z"
}
}],
"outflows": [{ "asset": "EUR", "amount": "40000" }]
}
}
Example 2: USDC Trade Flow (De-peg Safe)
After Stage 2 (derive) - SKIPPED:
{
"movements": {
"inflows": [{ "asset": "BTC", "amount": "1.0" }], // No price yet
"outflows": [{ "asset": "USDC", "amount": "50000" }]
}
}
After Stage 3 (fetch) - BOTH fetched:
{
"movements": {
"inflows": [{
"asset": "BTC",
"amount": "1.0",
"priceAtTxTime": {
"price": { "amount": "50127.45", "currency": "USD" },
"source": "coingecko",
"fetchedAt": "2025-01-01T...",
"granularity": "minute"
}
}],
"outflows": [{
"asset": "USDC",
"amount": "50000",
"priceAtTxTime": {
"price": { "amount": "0.9998", "currency": "USD" }, // Actual price!
"source": "coingecko",
"fetchedAt": "2025-01-01T...",
"granularity": "minute"
}
}]
}
}
Example 3: USD Trade Flow
After Stage 2 (derive) - SUCCESS:
{
"movements": {
"inflows": [{
"asset": "BTC",
"amount": "1.0",
"priceAtTxTime": {
"price": { "amount": "50000", "currency": "USD" }, // Derived!
"source": "exchange-execution",
"fetchedAt": "2023-01-15T10:00:00Z",
"granularity": "exact"
}
}],
"outflows": [{ "asset": "USD", "amount": "50000" }]
}
}
Open Questions
- Interactive FX entry: Should Stage 1 support
--interactive
for manual FX rate entry when provider unavailable?- Recommendation: Yes, for graceful degradation (similar to
prices fetch --interactive
)
- Recommendation: Yes, for graceful degradation (similar to
- Transaction linking: Should normalized prices propagate through confirmed transaction links?
- Status: Unclear, may defer to separate issue
- Command deprecation: Keep
prices derive
andprices fetch
as standalone commands, or deprecate in favor ofprices enrich
?- Recommendation: Keep all for flexibility, document
prices enrich
as primary
- Recommendation: Keep all for flexibility, document
Acceptance Criteria
- [ ]
prices enrich
command normalizes all non-USD fiat prices to USD - [ ] Only actual USD trades (not stablecoins) derive prices
- [ ] Stablecoins (USDC, USDT, DAI) are fetched with actual historical prices from providers
- [ ] FX metadata (
fxRateToUSD
,fxSource
,fxTimestamp
) correctly populated for normalized prices - [ ] All stages can run independently via
--normalize-only
,--derive-only
,--fetch-only
- [ ] Full pipeline can run in sequence with single command
- [ ]
--interactive
mode supports manual FX rate entry - [ ] Comprehensive test coverage including de-peg scenarios
- [ ] Build passes, all tests pass
- [ ] No tech debt introduced
Related
- #153 - Parent issue: Add FX rate tracking for multi-currency support
- ADR-003 - Unified Price and FX Enrichment Architecture