Skip to content

Parallel Batch Translation

[view source code]

This example demonstrates using Flowcraft's .batch() helper to translate a document into multiple languages concurrently, showcasing significant performance improvements for I/O-bound tasks.

The Goal

Translate a source README.md file into multiple languages (Spanish, German, etc.) in parallel and save each translation to the translations/ directory.

The Code

flow.ts

typescript
import type { NodeContext, NodeResult } from 'flowcraft'
import * as fs from 'node:fs/promises'
import * as path from 'node:path'
import { createFlow } from 'flowcraft'
import { callLLM } from './utils.js'

interface TranslationContext {
	'text': string
	'languages': string[]
	'output_dir': string
	'prepare-jobs': { language: string, text: string }[]
	'translations': { language: string, translation: string }[]
}

// 1. Prepare the list of translation jobs
async function prepareJobs(ctx: NodeContext<TranslationContext>): Promise<NodeResult> {
	const languages = (await ctx.context.get('languages'))!
	const text = (await ctx.context.get('text'))!
	// The output of this node is an array of objects, which the batch processor will iterate over.
	const jobs = languages.map(language => ({ language, text }))
	return { output: jobs }
}

// 2. This function will be executed FOR EACH item in the batch
async function translateItem(ctx: NodeContext<TranslationContext>): Promise<NodeResult> {
	// The `input` for a batch worker is a single item from the source array.
	const { language, text } = ctx.input as { language: string, text: string }
	const prompt = `
Translate the following markdown text into ${language}.
Preserve markdown formatting, links, and code blocks.
Return only the translated text.

Original Text:
${text}`

	console.log(`Translating to ${language}...`)
	const translation = await callLLM(prompt)
	console.log(`✓ Finished ${language}`)
	return { output: { language, translation } }
}

// 3. This node runs AFTER the entire batch is complete
async function saveResults(ctx: NodeContext<TranslationContext>): Promise<NodeResult> {
	// The `input` for the successor of a batch is an array of all worker outputs.
	const translations = ctx.input as { language: string, translation: string }[]
	const outputDir = (await ctx.context.get('output_dir'))!

	if (!translations || translations.length === 0) {
		console.warn('No translations to save.')
		return { output: 'Saved 0 files.' }
	}

	const promises = translations.map(({ language, translation }) => {
		const filename = path.join(outputDir!, `README_${language.toUpperCase()}.md`)
		console.log(`Saving translation to ${filename}`)
		return fs.writeFile(filename, translation, 'utf-8')
	})

	await Promise.all(promises)
	return { output: `Saved ${translations.length} files.` }
}

export function createTranslateFlow() {
	const flow = createFlow<TranslationContext>('parallel-translation')

	// Define all the nodes first
	flow.node('prepare-jobs', prepareJobs)
		.node('save-results', saveResults, { inputs: 'translations' })

	// Define the batch operation.
	// This implicitly creates 'translate-batch_scatter' and 'translate-batch_gather' nodes.
	flow.batch('translate-batch', translateItem, {
		// The scatter node will read its list of items from the context key 'prepare-jobs',
		// which is the output of the node with that ID.
		inputKey: 'prepare-jobs',
		// The gather node will collect all worker results into an array and place it
		// in the context under the key 'translations'.
		outputKey: 'translations',
	})

	// Wire the graph edges to define the sequence of execution.
	// 1. Run 'prepare-jobs' first.
	// 2. The output of 'prepare-jobs' is used by 'translate-batch_scatter'.
	// 3. When 'translate-batch_gather' is complete, run 'save-results'.
	flow.edge('prepare-jobs', 'translate-batch_scatter')
	flow.edge('translate-batch_gather', 'save-results')

	return flow
}

main.ts

typescript
import * as fs from 'node:fs/promises'
import * as path from 'node:path'
import process from 'node:process'
import dotenv from 'dotenv'
import { FlowRuntime } from 'flowcraft'
import { createTranslateFlow } from './flow.js'

dotenv.config()

async function main() {
	const sourceReadmePath = path.resolve(process.cwd(), '../../README.md')
	const outputDir = path.resolve(process.cwd(), 'translations')
	await fs.mkdir(outputDir, { recursive: true })

	const text = (await fs.readFile(sourceReadmePath, 'utf-8'))
		.split('##')
		.slice(0, 2)
		.join('##')

	const languages = [
		'Spanish',
		'German',
		'Chinese',
		'Japanese',
		'Russian',
		'Portuguese',
		'French',
		'Korean',
	]

	const translateFlow = createTranslateFlow()
	const blueprint = translateFlow.toBlueprint()
	const functionRegistry = translateFlow.getFunctionRegistry()

	const runtime = new FlowRuntime({})

	console.log(`Starting parallel translation into ${languages.length} languages...`)
	const startTime = Date.now()

	await runtime.run(
		blueprint,
		{ text, languages, output_dir: outputDir },
		{ functionRegistry },
	)

	const duration = (Date.now() - startTime) / 1000
	console.log(`\nTotal parallel translation time: ${duration.toFixed(2)} seconds`)
	console.log('\n=== Translation Complete ===')
}

main().catch(console.error)

Performance Comparison

Running translations in parallel dramatically reduces the total execution time compared to a one-by-one sequential approach.

  • Sequential: ~60 seconds
  • Parallel (this example): ~10 seconds

(Actual times will vary based on API response speed and system.)

This example shows how Flowcraft can orchestrate both sequential and parallel tasks in a single, readable definition, making it a great fit for data processing pipelines.


[view source code]

Released under the MIT License.