A lightweight, low-overhead concurrency library.
Utilities to create and run tasks.
import * as co from 'ciorent';
const task = async (id: number) => {
await co.sleep((10 - id) * 20 + 50);
console.log('Task', id, 'done');
}
// Spawn and run 5 tasks sequentially
console.log('Running 5 tasks sequentially:');
await co.sequential(5, task);
// Spawn and run 5 tasks concurrently
console.log('Running 5 tasks concurrently:');
await Promise.all(co.spawn(5, task));
Cross-runtime synchronous and asynchronous sleep functions.
import * as co from 'ciorent';
const logTime = (label: string) => console.log(`${label}: ${Math.floor(performance.now())}ms`);
logTime('Start');
// Non-blocking
await co.sleep(500);
logTime('After about 0.5s');
// This blocks the event loop
// On the browser this only works in workers and blocks the worker thread
co.sleepSync(500);
logTime('After another 0.5s');
Postpones execution until after an idle period.
import * as co from 'ciorent';
const fn = co.debounce((id: number) => {
console.log('ID:', id);
}, 500);
fn(1); // fn(1) gets skipped
await co.sleep(100);
fn(2); // fn(2) gets executed
Executes a function at a regular interval.
import * as co from 'ciorent';
// Allow 2 calls in 500ms
const throttle = co.throttle(500, 2);
co.spawn(8, async (id) => {
await throttle();
console.log(`${id}: ${Math.floor(performance.now())}ms`);
});
Continue the execution on next tick, allowing other asynchronous tasks to run.
import * as co from 'ciorent';
const logTime = (label: string) => console.log(`${label}: ${Math.floor(performance.now())}ms`);
// Expensive sync task
const task1 = async () => {
let x = 0;
// Simulate heavy operation
for (let i = 0, l = (Math.random() + 15) * 1e6; i < l; i++) {
// Yield control back occasionally to the runtime, allowing
// it to schedule other tasks
if (i % 1e5 === 0)
await co.nextTick;
x += Math.random() * 32 + i * Math.round(Math.random() * 16);
}
console.log('Task 1 result:', x);
};
// Short async task
const task2 = async (id: number) => {
logTime('Task 2.' + id + ' start fetching');
await fetch('http://localhost:3000').catch(() => {});
logTime('Task 2.' + id + ' done fetching');
};
task1();
// Task 2 will not get blocked by task 1
co.spawn(5, task2);
Pubsub allows broadcasting messages to topics that can be recieved by subscribers.
import * as topic from 'ciorent/topic';
import * as co from 'ciorent';
const numbers = topic.init<number>();
// Spawn 3 tasks that subscribe to the topic
co.spawn(3, async (id) => {
const subscriber = topic.subscribe(numbers);
while (true) {
const msg = await topic.dispatch(subscriber);
if (msg == null) return;
console.log('Task', id, 'recieved:', msg);
}
});
// Publish messages to the topic
for (let i = 0; i < 3; i++) {
topic.publish(numbers, i);
await co.nextTick;
}
// Send undefined to every topic
topic.flush(numbers);
Semaphore is a concurrency primitive used to control access to a common resource by multiple processes.
import * as semaphore from 'ciorent/semaphore';
import * as co from 'ciorent';
// Only allow 2 task to run concurrently
const sem = semaphore.init(2);
const task = async (id: number) => {
// Acquire the semaphore or wait for the semaphore to be available
await semaphore.acquire(sem);
console.log('Task', id, 'started');
// Let the main thread schedules other tasks
for (let i = 1; i <= 5; i++) await co.nextTick;
console.log('Task', id, 'end');
// Release the semaphore
semaphore.release(sem);
}
// Try to run 5 tasks concurrently
co.spawn(5, task);
Send and recieve data asynchronously through streams.
import * as stream from 'ciorent/stream';
import * as co from 'ciorent';
const numbers = stream.init<number>();
// Spawn 3 tasks that read from the stream
co.spawn(3, async (id) => {
while (true) {
const msg = await stream.read(numbers);
if (msg == null) return;
console.log('Task', id, 'recieved:', msg);
}
});
// Write messages to the stream
for (let i = 0; i < 3; i++) {
stream.write(numbers, i);
await co.nextTick;
}
// Send undefined to every stream
stream.flush(numbers);
Wait for a value to be resolved.
import * as defer from 'ciorent/defer';
const logTime = (label: string) => console.log(`${label}: ${Math.floor(performance.now())}ms`);
const deferredUrl = defer.init<string>();
const task = async () => {
// Blocks until the defer is resolved
const url = await defer.wait(deferredUrl);
logTime('Start fetching');
await fetch(url).catch(() => {});
logTime('Done fetching');
}
const prepare = () => {
// This always run first as task is waiting
logTime('Run before fetch');
defer.resolve(deferredUrl, 'http://localhost:3000');
}
task();
prepare();
Virtual threads with more controlled execution.
import * as co from 'ciorent';
import * as fiber from 'ciorent/fiber';
const logTime = (label: string) => console.log(`${label}: ${Math.floor(performance.now())}ms`);
const f1 = fiber.fn(function* () {
// Wait for a promise
console.log('Fiber 1 waiting: 1s');
yield co.sleep(1000);
// Wait for a promise and return its result
const res = yield* fiber.unwrap(Promise.resolve(1));
console.log('Fiber 1 recieved:', res);
return Math.random();
});
{
// Start the fiber process on next event loop cycle
const main = fiber.spawn(function* (proc) {
// Start f1, wait for the process to complete and get the result
console.log('Fiber 2: joins fiber 1');
const res = yield* fiber.join(fiber.spawn(f1));
console.log('Fiber 2 recieved:', res);
// Start f1 and make its lifetime depends on current fiber
console.log('Fiber 2: spawns fiber 1');
const childProc = fiber.spawn(f1);
fiber.mount(childProc, proc);
});
console.log('Fiber 2 started:', fiber.resumed(main));
// Wait for the fiber process to finish
await fiber.done(main);
// Check finish status
console.log('Fiber 2 completed:', fiber.completed(main));
}
{
console.log('------------------------');
const main = fiber.spawn(f1);
console.log('Fiber 1 started:', fiber.resumed(main));
// Interrupt a fiber
fiber.interrupt(main);
// Execution will be stopped on the last yield
await fiber.done(main);
console.log('Fiber 1 interrupted:', fiber.interrupted(main));
}
{
console.log('------------------------');
const main = fiber.spawn(f1);
logTime('Fiber 1 started');
// Wait for a time period then interrupt the fiber
fiber.timeout(main, 500);
await fiber.done(main);
logTime('Fiber 1 interrupted');
}