Channel[T] — основной примитив межфибровой коммуникации. Модель — capability-split (Rust mpsc-style): Channel.new(cap) возвращает пару объектов с разделёнными правами — ChanWriter[T] («только слать») и ChanReader[T] («только получать»).

select { ... } — multiplexed channel operations: ожидает несколько recv/send одновременно, просыпается по первому готовому arm'у.


Quickstart

test "channel: send + recv FIFO" {
    let { tx, rx } = Channel.new(4)
    tx.send(10)
    tx.send(20)
    tx.send(30)
    let a = rx.recv()
    let b = rx.recv()
    let c = rx.recv()
    assert(a.unwrap_or(-1) == 10)
    assert(b.unwrap_or(-1) == 20)
    assert(c.unwrap_or(-1) == 30)
    tx.close()
}
test "select: data wins over timeout" {
    let ch = Channel.new(1)
    let tx = ch.tx
    let rx = ch.rx
    let mut branch = 0
    supervised {
        spawn {
            tx.send(99)
            select {
                Some(v) = rx                                          => { branch = v }
                Some(_) = ChanReader.close_after(Duration.from_millis(200)) => { branch = -1 }
            }
        }
    }
    assert(branch == 99)
}

Channel.new

fn Channel[T].new(capacity int) -> { tx ChanWriter[T], rx ChanReader[T] }

Возвращает пару — record с полями tx (writer-capability) и rx (reader-capability). Поддерживает четыре формы извлечения:

// 1. Record-destructure (Plan 53, наиболее идиоматично)
let { tx, rx } = Channel.new(4)

// 2. Record-destructure с переименованием
let { tx: sender, rx: receiver } = Channel.new(4)

// 3. Tuple destructure (compat с D91 spec примерами)
let (tx, rx) = Channel.new(4)

// 4. Record-access (когда нужны разные lifetimes)
let ch = Channel.new(4)
let tx = ch.tx
let rx = ch.rx

Capacity ≥ 1. Channel.new(0) сейчас панкует с "capacity must be >= 1" (Plan 44.1 Ф.3) — zero-capacity rendezvous каналы пока не реализованы.

Тип передачи (T) выводится из первого send/recv:

let { tx, rx } = Channel.new(8)
tx.send(42)         // T = int
let v = rx.recv()   // Option[int]

Явная аннотация — turbofish: Channel[str].new(8).


ChanWriter[T] API

МетодСигнатураСемантика
send(v T) -> boolBlocking send. Возвращает true если отправил; false если канал закрыт (не panic — Plan 30)
try_send(v T) -> boolNon-blocking. true если поместилось; false если буфер полон или канал закрыт
close() -> ()Закрывает writer-capability. Idempotent. С multi-writer (clone) — ref-counted: канал реально закрывается только когда все writers закрылись
clone() -> ChanWriter[T]Создаёт дополнительный writer на тот же буфер. writer_count++
is_closed() -> booltrue если буфер закрыт и у этого writer'а нет capability слать

send возвращает bool

test "channel: send после close возвращает false, не паникует" {
    let { tx, rx: _rx } = Channel.new(2)
    assert(tx.send(1))
    tx.close()
    assert(!tx.send(99))    // false: канал закрыт
}

Полезно для graceful shutdown без обёртки в try/catch:

fn produce(tx ChanWriter[Job], jobs []Job) {
    let mut i = 0
    while i < jobs.len() {
        if !tx.send(jobs[i]) {
            break               // consumer закрылся — выходим тихо
        }
        i = i + 1
    }
}

try_send — non-blocking

test "channel: try_send full buffer" {
    let { tx, rx } = Channel.new(2)
    assert(tx.try_send(10))
    assert(tx.try_send(20))
    assert(!tx.try_send(30))            // буфер полон
    assert(rx.recv().unwrap_or(-1) == 10)
    assert(tx.try_send(30))             // место освободилось
    tx.close()
}

clone — multi-writer

test "channel: fan-in — два writer'а, один reader" {
    let { tx, rx } = Channel.new(8)
    let tx2 = tx.clone()                // writer_count = 2
    let mut sum = 0
    supervised {
        spawn { tx.send(1);  tx.send(2);  tx.send(3);  tx.close() }
        spawn { tx2.send(10); tx2.send(20); tx2.send(30); tx2.close() }
        spawn {
            while let Some(v) = rx.recv() { sum = sum + v }
        }
    }
    assert(sum == 66)
}

Канал закрывается только когда все writers вызвали close(). Внутри — ref-count (writer_count): Channel.new инициализирует в 1, clone() инкрементирует, close() декрементирует. Когда достигает 0 — канал реально закрывается, rx.recv() начинает возвращать None.


ChanReader[T] API

МетодСигнатураСемантика
recv() -> Option[T]Blocking recv. Some(v) пока есть данные или канал открыт; None когда канал closed и буфер пуст
try_recv() -> Option[T]Non-blocking. None если буфер пуст (НЕ означает что канал закрыт — проверяй is_closed() отдельно)
len() -> intКоличество элементов в буфере сейчас
capacity() -> intCapacity, заданная в Channel.new
is_closed() -> booltrue если все writers закрылись

recvOption[T]

Closed-channel — не ошибка, валидный исход «источник закончился». Option[T] композируется с match, ?, ??, и идиоматичным while let-loop'ом.

test "channel: close + recv drain" {
    let { tx, rx } = Channel.new(4)
    tx.send(1)
    tx.send(2)
    tx.close()
    assert(rx.recv().unwrap_or(-1) == 1)
    assert(rx.recv().unwrap_or(-1) == 2)
    assert(rx.recv().is_none())             // drain'нули — None
    assert(rx.recv().is_none())             // повторно — тоже None
}

try_recv различает empty-open vs empty-closed

test "channel: try_recv различает empty-open от empty-closed через is_closed" {
    let { tx, rx } = Channel.new(4)
    assert(rx.try_recv().is_none())     // пустой открытый
    assert(!rx.is_closed())
    tx.close()
    assert(rx.try_recv().is_none())     // пустой закрытый — то же None
    assert(rx.is_closed())              // отличает через is_closed
}

len / capacity

test "channel: len и capacity" {
    let { tx, rx } = Channel.new(8)
    assert(rx.capacity() == 8)
    assert(rx.len() == 0)
    tx.send(1)
    tx.send(2)
    assert(rx.len() == 2)
    let _ = rx.recv()
    assert(rx.len() == 1)
    tx.close()
}

Идиомы

Drain через while let

test "channel: while-let drain pattern" {
    let { tx, rx } = Channel.new(4)
    tx.send(10)
    tx.send(20)
    tx.send(30)
    tx.close()
    let mut sum = 0
    while let Some(v) = rx.recv() {
        sum = sum + v
    }
    assert(sum == 60)
}

Это самый идиоматичный receiver-pattern. Цикл завершается автоматически, когда канал закрылся и буфер пуст — recv() вернёт None.

Producer/consumer

test "channel: producer-consumer pipeline" {
    let { tx, rx } = Channel.new(4)
    let mut sum = 0
    supervised {
        spawn {
            tx.send(1)
            tx.send(2)
            tx.send(3)
            tx.send(4)
            tx.send(5)
            tx.close()                  // важно: producer закрывает после finish
        }
        spawn {
            while let Some(v) = rx.recv() {
                sum = sum + v
            }
        }
    }
    assert(sum == 15)
}

Ping-pong

test "channel: ping-pong" {
    let { tx: tx1, rx: rx1 } = Channel.new(1)
    let { tx: tx2, rx: rx2 } = Channel.new(1)
    let mut result = 0
    supervised {
        spawn {
            tx1.send(10)
            let reply = rx2.recv()
            result = reply.unwrap_or(-1)
            tx1.close()
        }
        spawn {
            let msg = rx1.recv()
            tx2.send(msg.unwrap_or(0) * 2)
            tx2.close()
        }
    }
    assert(result == 20)
}

Fan-in (multi-writer)

Несколько spawn'ов производят, один потребляет.

let { tx, rx } = Channel.new(8)
supervised {
    for item in work_items {
        let worker_tx = tx.clone()      // каждому spawn'у — свой capability
        spawn {
            worker_tx.send(process(item))
            worker_tx.close()
        }
    }
    tx.close()                          // close корневого writer'а
    spawn {
        while let Some(v) = rx.recv() {
            collect(v)
        }
    }
}

Почему clone() обязателен: без него все spawn'ы захватили бы один tx через managed reference; close() первого закрыл бы канал для всех. С clone() каждый spawn держит свою capability и закрывает её независимо — канал закрывается только когда все worker_count + 1 writers вызвали close().

Relay (cross-channel pipeline)

fn relay(rx ChanReader[int], tx ChanWriter[int]) {
    while let Some(v) = rx.recv() {
        tx.send(v * 2)
    }
    tx.close()
}

test "channel: relay — Receiver → Sender pipeline через функцию" {
    let { tx: tx1, rx: rx1 } = Channel.new(4)
    let { tx: tx2, rx: rx2 } = Channel.new(4)
    tx1.send(1)
    tx1.send(2)
    tx1.send(3)
    tx1.close()
    relay(rx1, tx2)
    let mut s = 0
    while let Some(v) = rx2.recv() { s = s + v }
    assert(s == 12)
}

Передача в функции

Capability-types в сигнатурах делают API явным.

fn fill_channel(tx ChanWriter[int], values []int) {
    let mut i = 0
    while i < values.len() {
        tx.send(values[i])
        i = i + 1
    }
    tx.close()
}

fn drain_channel(rx ChanReader[int]) -> int {
    let mut sum = 0
    while let Some(v) = rx.recv() {
        sum = sum + v
    }
    sum
}

test "channel: Sender и Receiver передаются независимо" {
    let { tx, rx } = Channel.new(8)
    fill_channel(tx, [100, 200, 300])
    let s = drain_channel(rx)
    assert(s == 600)
}

Передать tx куда не нужно recv — type system гарантирует, что получатель не сможет прочитать (и наоборот).


select { ... }

Синтаксис и семантика

select-expr  = 'select' '{' NL* select-arm+ '}'
select-arm   = channel-arm | default-arm
channel-arm  = pattern '=' (recv-target | send-op) guard? '=>' arm-body NL*
recv-target  = expr                                 // bare rx
send-op      = expr '.' 'send' '(' expr ')'
guard        = 'if' expr
default-arm  = '_' '=>' arm-body NL*
arm-body     = block | stmt

Bootstrap-форма recv: Some(v) = rx => { ... } — bare rx без .recv(). Spec упоминает также pattern = rx.recv() форму; в текущем компиляторе работает только bare-форма.

Семантика (D94):

  1. Guard evaluationif <expr> перед стрелкой делает arm disabled когда false.
  2. Immediate check — все enabled arms проверяются в псевдослучайном порядке (Fisher-Yates). Если ≥1 готов — выполняется без park'а.
  3. Park — если ни один не готов и нет default: регистрирует waiter на каждый arm, паркует fiber.
  4. Wake — первый готовый arm будит fiber; остальные waiters unlinked. done-флаг предотвращает double-wake.
  5. Fairness — Fisher-Yates shuffle на каждой итерации (нет starvation).
  6. _ => ... (default) — если присутствует: шаг 2 всегда succeeds, fiber не паркуется.
  7. Все каналы закрыты + нет default → panic "select: all channels closed".
  8. Cancel (tok.cancel() от supervised(cancel:)) — отменяет все pending waiters; fiber просыпается, проверяет cancel_requested.

Recv arm

test "select single recv: value from channel" {
    let ch = Channel.new(1)
    let tx = ch.tx
    let rx = ch.rx
    supervised {
        spawn { tx.send(42) }
        spawn {
            let mut got = 0
            select {
                Some(v) = rx => { got = v }
            }
            assert(got == 42)
        }
    }
}

Send arm

test "select send arm: sends to channel with space" {
    let ch = Channel.new(1)
    let tx = ch.tx
    let rx = ch.rx
    let mut sent = 0
    select {
        tx.send(77) => { sent = 1 }
        _           => { sent = -1 }
    }
    assert(sent == 1)
    let opt = rx.recv()
    let mut got = 0
    match opt {
        Some(v) => { got = v }
        None    => { got = -1 }
    }
    assert(got == 77)
}

Guard arms

test "select guard: disabled arm skips to default" {
    let ch = Channel.new(1)
    ch.tx.send(10)
    let rx = ch.rx
    let enabled = false
    let mut branch = 0
    select {
        Some(v) = rx if enabled => { branch = v }
        _                       => { branch = -1 }
    }
    assert(branch == -1)         // arm disabled — default сработал
}

Guard — pre-condition. Если false, arm выключен ещё до проверки ready-state канала. Аналог if в Tokio select!. Go guard'ы не поддерживает.

Default arm

_ => { ... } — выполняется если ни один channel-arm не готов сейчас. Превращает select в non-blocking.

test "select recv with default: default when channel empty" {
    let ch = Channel.new(1)
    let rx = ch.rx
    let mut branch = 0
    select {
        Some(_) = rx => { branch = 1 }
        _            => { branch = 2 }     // ← default
    }
    assert(branch == 2)
}

Wildcard _ = rx

Wildcard в recv-target срабатывает на оба состояния: Some(v) и None (closed). Some(v) = rx срабатывает только на реальное значение.

test "Some arm skips closed+empty, picks open channel with data" {
    let ch1 = Channel.new(1)
    let ch2 = Channel.new(1)
    let tx1 = ch1.tx
    let tx2 = ch2.tx
    let rx1 = ch1.rx
    let rx2 = ch2.rx

    tx1.close()                  // ch1 closed+empty
    tx2.send(42)                 // ch2 has data

    let mut result = 0
    select {
        Some(v) = rx1 => { result = -1 }     // Some НЕ срабатывает на closed
        Some(v) = rx2 => { result = v  }     // ← выполнится
    }
    assert(result == 42)
}

test "wildcard fires immediately on closed+empty channel" {
    let ch = Channel.new(1)
    let tx = ch.tx
    let rx = ch.rx
    tx.close()

    let mut fired = false
    select {
        _ = rx => { fired = true }           // ← wildcard ловит closed
    }
    assert(fired)
}

Правило:

None = rx отдельным arm пока не реализован (Plan 31 §«Отличия от spec»); для дифференциации используйте _ = rx + match внутри тела arm'а или rx.is_closed() после recv-а.

Timeout через ChanReader.close_after

Специального timeout => arm'а нет — timeout это обычный recv-канал, создаваемый ChanReader.close_after(Duration).

import std.time.duration

test "select timeout: fires when channel stays empty" {
    let ch = Channel.new(1)
    let rx = ch.rx
    let mut branch = 0
    supervised {
        spawn {
            select {
                Some(_) = rx                                          => { branch = 1 }
                Some(_) = ChanReader.close_after(Duration.from_millis(50)) => { branch = 2 }
            }
        }
    }
    assert(branch == 2)
}

test "select timeout: data wins over timeout" {
    let ch = Channel.new(1)
    let tx = ch.tx
    let rx = ch.rx
    let mut branch = 0
    supervised {
        spawn {
            tx.send(99)
            select {
                Some(v) = rx                                           => { branch = v }
                Some(_) = ChanReader.close_after(Duration.from_millis(200)) => { branch = -1 }
            }
        }
    }
    assert(branch == 99)
}

ChanReader.close_after(d Duration) -> ChanReader[()] — реализован в std/concurrency/timer.nv как compiler-builtin (под капотом nova_chan_reader_close_after_ns(d.nanos)). Канал закрывается через d; первый recv() возвращает Some(()) после firing'а, потом None.

Type safety (Plan 65 revision 2026-05-18): ранее API назывался Time.after(int ms) — bare int (мс/мкс/сек?). Теперь — типизированный Duration. Migration: cargo run --bin migrate_plan65 -- --apply — переписывает literal-аргументы автоматически (см. nova-cli).

Edge-cases:

Performance: сейчас каждый вызов аллоцирует свежий uv_timer_t (~120 байт + syscall). Адекватно для idiomatic 10–100 concurrent timers. Custom timer-wheel для high-throughput (10k+ HTTP timeouts) — Plan 66.

Multi-arm fairness

test "select multi-arm: fairness — both channels get served" {
    let n = 50
    let ch1 = Channel.new(n)
    let ch2 = Channel.new(n)
    let tx1 = ch1.tx
    let tx2 = ch2.tx
    let rx1 = ch1.rx
    let rx2 = ch2.rx

    let mut from1 = 0
    let mut from2 = 0

    supervised {
        spawn {
            let mut i = 0
            while i < n {
                tx1.send(1)
                tx2.send(2)
                i += 1
            }
        }
        spawn {
            let mut total = 0
            while total < n * 2 {
                select {
                    Some(v) = rx1 => { from1 += 1; let _ = v }
                    Some(v) = rx2 => { from2 += 1; let _ = v }
                }
                total += 1
            }
        }
    }
    assert(from1 > 0)
    assert(from2 > 0)
    assert(from1 + from2 == n * 2)
}

Fisher-Yates shuffle на каждой итерации обеспечивает, что оба канала получают свою долю (Go использует тот же подход — select в Nova семантически совместим).


supervised(cancel:) + select

test "select: data wins supervised(cancel:) race" {
    let ch = Channel.new(1)
    let tx = ch.tx
    let rx = ch.rx
    let mut branch = 0
    let mut error_seen = false

    let tok = CancelToken.new()
    with Fail = handler Fail {
        fail(_msg) {
            error_seen = true
            interrupt ()
        }
    } {
        supervised(cancel: tok) {
            spawn {
                tx.send(77)
                Time.sleep(500)
                tok.cancel()
            }
            spawn {
                select {
                    Some(v) = rx                                           => { branch = v }
                    Some(_) = ChanReader.close_after(Duration.from_millis(200)) => { branch = -1 }
                }
            }
        }
    }
    assert(!error_seen)
    assert(branch == 77)
}

tok.cancel() отменяет все pending waiters в любом select-блоке внутри supervised(cancel: tok). Fiber просыпается, проверяет cancel_requested, и выходит из supervised-блока через структурную отмену (D75 / Plan 49).

Cancellation не ошибка — она не превращается в throw, не вызывает Fail-handler. Поведение симметрично Go context.Done(), но с типизированным CancelToken (D75) вместо error-канала.


Закрытие каналов

Идиома: defer tx.close()

Spec preference — defer гарантирует close при выходе из scope:

fn run_pipeline() Net -> () {
    let { tx, rx } = Channel[Job].new(10)
    defer tx.close()

    supervised {
        spawn { for j in jobs { tx.send(j) } }
        spawn { while let Some(j) = rx.recv() { process(j) } }
    }
}   // <-- tx.close() сработает гарантированно; rx.recv() в spawn'е получит None и завершится

Bootstrap-ограничение: defer + tuple-destructure

⚠️ Известная проблема: defer tx.close() не работает в сочетании с let (tx, rx) = Channel.new(N) или let { tx, rx } = Channel.new(N)defer эмитит setjmp-frame до объявления переменных, что ломает scope (Plan 25 G8, будет устранено когда внедрят open-coded defer).

Workaround: explicit tx.close() в конце функции, либо разделить destructure:

let ch = Channel.new(N)
let tx = ch.tx
let rx = ch.rx
defer tx.close()    // OK — tx объявлен напрямую
// ...

Auto-close на drop — нет

В отличие от Rust mpsc, Nova не имеет deterministic destructor'ов (managed heap, D6). GC соберёт sender «когда-нибудь» — это недетерминированно и сделало бы тесты flaky. Поэтому close() всегда explicit.

Idempotent

test "channel: close idempotent" {
    let { tx, rx } = Channel.new(2)
    tx.close()
    tx.close()                  // не error
    assert(rx.is_closed())
}

С multi-writer (clone) повторный close() одного writer'а не декрементирует writer_count повторно (idempotent per-instance).


Panic-сценарии

УсловиеСообщение
Channel.new(0)"capacity must be >= 1" (Plan 44.1 Ф.3)
select со всеми каналами closed + без default"select: all channels closed" (Plan 31 Ф.6)
ChanReader.close_after(<negative Duration>)panic с nanosecond-значением
select с arm_count > stackoverflow ловится до allocate'а — explicit panic

tx.send на closed-канал — не panic, возвращает false (Plan 30). rx.recv на closed+drained — не panic, возвращает None.


Bootstrap-ограничения

Что не работает / отложеноПлан
Dedicated None = rx arm (только _ = rx wildcard)Plan 31 followup
Channel.new(0) zero-capacity rendezvousPlan 44.2+
defer tx.close() + tuple/record destructurePlan 25 G8
pattern = rx.recv().recv()) форма в selectработает только bare pattern = rx
oneshot::channel<T> / watch::channel<T> / broadcast::channel<T>Plan 44.2
recv_many batch APIPlan 44.1 Ф.4 follow-up
Lock-free SPSC flavorPlan 50+ (Loom-verified)
tick_every(Duration) periodic tickerPlan 66
close_at(Monotonic) absolute deadlinePlan 65 Ф.13 (✅ реализовано)
Time-effect mock для deterministic timer-тестовPlan 65 Ф.10 (✅ реализовано)