You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
nim-eris/src/eris.nim

951 lines
31 KiB

# SPDX-FileCopyrightText: 2022 Endo Renberg
# SPDX-License-Identifier: Unlicense
## This module provides the basic procedures and types for ERIS encoding
## and decoding. Encoding and decoding always requires an `ErisStore`
## receiver object and store operations are asynchronous.
##
## `ErisStore` objects are implemented in additional modules.
runnableExamples:
import eris/memory_stores
import std/[asyncdispatch, streams]
let
text = "Hello world!"
store = newMemoryStore()
(capA, _) = waitFor encode(store, newStringStream(text))
(capB, _) = waitFor encode(store, newStringStream(text), convergentMode)
(capC, _) = waitFor encode(store, newStringStream(text), convergentMode)
assert capA != capB
assert capB == capC
assert waitFor(decode(store, capA)) == waitFor(decode(store, capB))
assert waitFor(decode(store, capB)) == waitFor(decode(store, capC))
## An `ErisStore` is implemented by `get` and `put` methods. Both
## operate on a variant of the `FutureBlock` type which holds a buffer,
## parameters, and callbacks.
runnableExamples:
import eris/memory_stores
import std/[asyncdispatch, streams]
type LoggerStore {.final.} = ref object of ErisStore
other: ErisStore
proc newLogger(s: ErisStore): LoggerStore =
LoggerStore(other: s)
method get(logger: LoggerStore; fut: FutureGet) =
fut.addCallback:
if fut.failed:
stderr.writeLine("failed to get ", fut.chunkSize.int, " byte chunk ", fut.`ref`)
else:
stderr.writeLine("got ", fut.chunkSize.int, " byte chunk ", fut.`ref`)
get(logger.other, fut)
method put(logger: LoggerStore; fut: FuturePut) =
fut.addCallback:
if fut.failed:
stderr.writeLine("failed to put ", fut.chunkSize.int, " byte chunk ", fut.`ref`)
else:
stderr.writeLine("put ", fut.chunkSize.int, " byte chunk ", fut.`ref`)
put(logger.other, fut)
let
store = newMemoryStore()
logger = newLogger(store)
(cap, _) = waitFor encode(logger, newStringStream("Hail ERIS!"))
discard waitFor decode(logger, cap)
import std/[asyncdispatch, hashes, math, sets, streams, strutils, sysrand]
import base32, eris/private/[chacha20/src/chacha20, blake2/blake2]
const erisCborTag* = 276
## CBOR tag for ERIS binary read capabilities.
## https://www.iana.org/assignments/cbor-tags/cbor-tags.xhtml
type
Mode* = enum
## Type for specifying if an encoding shall be unique or convergent.
## See [section 2.3](https://eris.codeberg.page/spec/#section-2.3)
## for an explaination of encoding modes.
uniqueMode,
convergentMode,
ChunkSize* = enum
## Valid chunk sizes.
chunk1k = 1 shl 10 ## 1 KiB
chunk32k = 32 shl 10 ## 32 KiB
TreeLevel* = uint8
Reference* = object ## Reference to an encrypted chunk.
bytes*: array[32, byte]
Key* = object ## Key for decrypting a chunk.
bytes*: array[32, byte]
Secret* = object ## Secret for salting a `Key`.
bytes*: array[32, byte]
Pair* {.packed.} = object
r*: Reference
k*: Key
ErisCap* = object ## A capability for retrieving ERIS encoded data.
pair*: Pair
level*: uint8
chunkSize*: ChunkSize
using
key: Key
secret: Secret
pair: Pair
cap: ErisCap
assert(sizeOf(Pair) == 64)
func arity*(bs: ChunkSize): int = bs.int shr 6
func toByte*(bs: ChunkSize): uint8 =
case bs
of chunk1k: 0x0a'u8
of chunk32k: 0x0f'u8
func toChar*(bs: ChunkSize): char =
case bs
of chunk1k: 'A'
of chunk32k: 'F'
func mask(bs: ChunkSize; n: int): int = n and bs.int.pred
func `*`*[T: SomeUnsignedInt](x: T; bs: ChunkSize): T =
## Convenience function to multiply an integer by a `ChunkSize` value.
case bs
of chunk1k: x shl 0x0a
of chunk32k: x shl 0x0f
func `div`*[T: SomeUnsignedInt](x: T; bs: ChunkSize): T =
## Convenience function to divide an integer by a `ChunkSize` value.
case bs
of chunk1k: x shr 0x0a
of chunk32k: x shr 0x0f
func recommendedChunkSize*(dataLength: Natural): ChunkSize =
## Return the recommended `ChunkSize` for encoding data of the
## given length. The current implementation selects 1KiB chunks for
## lengths less than 16KiB otherwise 32KiB. The reasoning is that
## anything less 16KiB is encoded in a tree with a depth of no more
## than two chunk. A 16KiB chunk would waste nearly half of a 32KiB
## chunk but only requires a single chunk to be fetched, whereas
## 16KiB-1 rould require 17 chunk requests.
## The behavior of this function is not guaranted to remain constant and
## because of storage efficiency and latency tradeoffs may not yield
## the best choice for all applications.
if dataLength < (16 shl 10): chunk1k else: chunk32k
proc `$`*(x: Reference|Key|Secret): string =
## Encode to Base32.
base32.encode(cast[array[32, char]](x.bytes), pad = false)
proc `==`*(x, y: ErisCap): bool = x.pair.r.bytes == y.pair.r.bytes
proc hash*(r: Reference): Hash =
## Reduce a `Reference` to a `Hash` value.
copyMem(addr result, unsafeAddr r.bytes[0], sizeof(result))
result = !$result
proc hash*(cap): Hash {.inline.} = hash(cap.pair.r)
const chunkSizes = {chunk1k.int, chunk32k.int}
proc reference*[T: byte|char](data: openarray[T]): Reference =
## Derive the `Reference` for a 1KiB or 32KiB buffer.
assert(data.len in chunkSizes)
var ctx: Blake2b
ctx.init(32)
ctx.update(data)
ctx.final(result.bytes)
proc bytes*(cap): seq[byte] =
## Binary encoding of the read-capability.
result = newSeqOfCap[byte](1+1+32+32)
result.add cap.chunkSize.toByte
result.add cap.level
result.add cap.pair.r.bytes
result.add cap.pair.k.bytes
func toBase32*(cap): string =
base32.encode(cast[seq[char]](cap.bytes), pad = false)
proc `$`*(cap): string =
## Encode a ``ErisCap`` to standard URN form.
## https://inqlab.net/projects/eris/#_urn
"urn:eris:" & cap.toBase32
proc fromBase32*[T: Reference | Key | Secret](v: var T; s: string): bool =
try:
var buf = base32.decode(s)
if buf.len == v.bytes.len:
copyMem(v.bytes[0].addr, buf[0].addr, v.bytes.len)
result = true
except: discard
proc parseCap*[T: char|byte](bin: openArray[T]): ErisCap =
assert(bin.len == 66)
result.chunkSize =
case bin[0].byte
of chunk1k.toByte: chunk1k
of chunk32k.toByte: chunk32k
else: raise newException(ValueError, "invalid ERIS chunk size")
result.level = uint8 bin[1]
if result.level < 0 or 255 < result.level:
raise newException(ValueError, "invalid ERIS root level")
copyMem(addr result.pair.r.bytes[0], unsafeAddr bin[2], 32)
copyMem(addr result.pair.k.bytes[0], unsafeAddr bin[34], 32)
proc parseErisUrn*(urn: string): ErisCap =
## Decode a URN to a ``ErisCap``.
let parts = urn.split(':')
if 3 <= parts.len:
if parts[0] == "urn":
if parts[1] == "eris":
if parts[2].len >= 106:
let bin = base32.decode(parts[2][0..105])
return parseCap(bin)
raise newException(ValueError, "invalid ERIS URN encoding")
type
BlockStatus* = enum unknown, verified, plaintext
FutureBlock* = ref FutureBlockObj
FutureBlockObj = object of RootObj
`ref`: Reference
error*: ref Exception
buffer*: seq[byte]
# I considered making this an array but alignment
# of the final object on the heap would be wack
callbacks: seq[proc () {.closure, gcsafe.}]
chunkSize: ChunkSize
status: BlockStatus
FutureGet* = ref FutureGetObj
FutureGetObj {.final.} = object of FutureBlockObj
discard
FuturePut* = ref FuturePutObj
FuturePutObj {.final.} = object of FutureBlockObj
discard
proc assertAtEnd(blk: FutureBlock) =
doAssert blk.callbacks.len == 0
proc assertIdle(blk: FutureBlock) =
doAssert blk.callbacks.len == 0
doAssert blk.error.isNil
proc assertVerified*(blk: FutureBlock) =
doAssert blk.status == verified, $blk.`ref`
proc addCallback*(blk: FutureBlock; cb: proc () {.closure, gcsafe.}) =
## Add a callback to a `FutureBlock`. Callbacks are called last-in-first-out
## when `complete` is called on `blk`.
blk.callbacks.add cb
template withFuture[T](blk: FutureBlock; fut: Future[T]; body: untyped): untyped =
assertIdle blk
blk.addCallback do:
if blk.failed: fail(fut, blk.error)
else:
try:
body
when T is void:
complete(fut)
except: fail(fut, getCurrentException())
template asFuture*(blk: FutureBlock; body: untyped): untyped =
assertIdle blk
var fut = newFuture[void]("asFuture")
blk.addCallback do:
if not blk.error.isNil: fail(fut, blk.error)
else:
try:
body
complete(fut)
except: fail(fut, getCurrentException())
fut
template asFuture*(blk: FutureBlock): untyped =
assertIdle blk
var fut = newFuture[void]("asFuture")
blk.addCallback do:
if blk.failed: fail(fut, blk.error)
else: complete(fut)
fut
proc addCallback*(fut: Future; blk: FutureBlock; cb: proc () {.closure, gcsafe.}) =
fut.addCallback do:
if fut.failed: fail(blk, fut.error)
else:
try: cb()
except: fail(blk, getCurrentException())
proc newFutureGet*(bs: ChunkSize): FutureGet =
FutureGet(buffer: newSeq[byte](bs.int), chunkSize: bs)
proc newFutureGet*(`ref`: Reference; bs: ChunkSize): FutureGet =
FutureGet(`ref`: `ref`, buffer: newSeq[byte](bs.int), chunkSize: bs)
proc newFuturePut*(bs: ChunkSize): FuturePut =
FuturePut(buffer: newSeq[byte](bs.int), status: plaintext, chunkSize: bs)
proc newFuturePut*(`ref`: Reference; bs: ChunkSize): FuturePut =
FuturePut(`ref`: `ref`, buffer: newSeq[byte](bs.int), status: plaintext, chunkSize: bs)
proc newFuturePut*[T: byte|char](buffer: openarray[T]): FuturePut =
case buffer.len
of chunk1k.int:
result = FuturePut(`ref`: reference(buffer), buffer: newSeq[byte](buffer.len), chunkSize: chunk1k, status: verified)
of chunk32k.int:
result = FuturePut(`ref`: reference(buffer), buffer: newSeq[byte](buffer.len), chunkSize: chunk32k, status: verified)
else: raiseAssert "invalid buffer size"
copyMem(addr result.buffer[0], unsafeAddr buffer[0], result.buffer.len)
func failed*(blk: FutureBlock): bool {.inline.} = not blk.error.isNil
proc `ref`*(blk: FutureBlock): Reference {.inline.} = blk.`ref`
proc `ref=`*(blk: FutureBlock; r: Reference) {.inline.} = blk.`ref` = r
func chunkSize*(blk: FutureBlock): ChunkSize {.inline.} = blk.chunkSize
func buffer*(blk: FutureBlock): pointer {.inline.} =
assert blk.buffer.len == blk.chunkSize.int
unsafeAddr blk.buffer[0]
func verified*(blk: FutureBlock): bool {.inline.} = blk.status = verified
proc verify*(blk: FutureBlock): bool {.discardable.} =
## Verify that `blk` corresponds to `ref` and set the chunk error
## otherwise.
assert not blk.verified, "FutureBlock already verified or improperly initialized"
var
digest: Reference
ctx: Blake2b
ctx.init(32)
ctx.update(blk.buffer)
ctx.final(digest.bytes)
result = digest.bytes == blk.`ref`.bytes
if result: blk.status = verified
else: blk.error = newException(IOError, "ERIS chunk does not match reference")
proc verify*(blk: FutureBlock; `ref`: Reference): bool {.discardable, deprecated.}=
assert blk.`ref` == `ref`
verify(blk)
proc complete*(blk: FutureBlock) =
## Complete a `FutureBlock`.
assert blk.callbacks.len > 0
let cb = pop blk.callbacks
try: cb()
except Exception as e: blk.error = e
if blk.callbacks.len > 0: complete(blk)
proc complete*(blk: FutureGet; src: pointer; len: Natural; status = unknown) =
## Complete a `Get` `FutureBlock` with the chunk at `src`.
blk.status = status
assert len == blk.buffer.len
copyMem(addr blk.buffer[0], src, len)
if status != verified: verify(blk)
complete(blk)
proc complete*(blk: FutureGet; buf: sink seq[byte]; status = unknown) =
blk.status = status
doAssert buf.len == blk.buffer.len
blk.buffer = move buf
if status != verified: verify(blk)
complete(blk)
proc complete*[T: byte|char](blk: FutureGet; buf: openarray[T]; status = unknown) {.inline.} =
complete(blk, unsafeAddr buf[0], buf.len, status)
proc fail*(blk: FutureBlock; e: ref Exception) =
blk.error = e
complete(blk)
proc copy*(blk: FutureBlock; dst: pointer; len: Natural) =
## Copy chunk data out of a `FutureBlock`.
assertVerified blk
doAssert len <= blk.buffer.len
if blk.error.isNil: copyMem(dst, addr blk.buffer[0], len)
else: raise blk.error
proc notFound*(blk: FutureBlock; msg = "") {.inline.} =
## Fail `f` with a `KeyError` exception.
fail(blk, newException(KeyError, msg))
proc moveBytes*(blk: FutureBlock): owned seq[byte] =
## Move the `seq[byte]` out of a `FutureBlock`.
## This is only safe to use in the first callback added to `blk`
## because it will be called last.
assert(blk.buffer.len == blk.chunkSize.int)
move blk.buffer
proc toBytes*(blk: FutureBlock): owned seq[byte] =
result.setLen(blk.buffer.len)
copyMem(addr result[0], unsafeAddr blk.buffer[0], result.len)
proc crypto*(blk: FutureBlock; key; level: TreeLevel) =
var nonce: Nonce
nonce[0] = level.uint8
discard chacha20(key.bytes, nonce, 0, blk.buffer, blk.buffer)
case blk.status
of verified: blk.status = plaintext
of plaintext: blk.status = verified
else: raiseAssert "invalid chunk status"
proc encryptLeafFuture(secret; blk: FutureBlock): Pair =
assert blk.status == plaintext, $blk.status
var ctx: Blake2b
ctx.init(32, secret.bytes)
ctx.update(blk.buffer)
ctx.final(result.k.bytes)
crypto(blk, result.k, 0)
ctx.init(32)
ctx.update(blk.buffer)
ctx.final(blk.`ref`.bytes)
result.r = blk.`ref`
proc encryptNodeFuture(level: TreeLevel; blk: FutureBlock): Pair =
assert blk.status == plaintext
var ctx: Blake2b
ctx.init(32)
ctx.update(blk.buffer)
ctx.final(result.k.bytes)
crypto(blk, result.k, level)
ctx.init(32)
ctx.update(blk.buffer)
ctx.final(blk.`ref`.bytes)
result.r = blk.`ref`
proc unpaddedLen(buf: openarray[byte]): int {.inline.} =
result = buf.high
while result >= 0:
case buf[result]
of 0x00: discard
of 0x80:
return
else: break
dec result
raise newException(IOError, "invalid ERIS chunk padding")
proc unpaddedLen(blk: FutureBlock): int {.inline.} = blk.buffer.unpaddedLen
iterator chunkPairs*(blk: seq[byte]): Pair =
var n = blk.high
while blk[n] == 0x00: dec n # TODO: schnell machen
n = n shr 6
let buf = cast[ptr UncheckedArray[Pair]](blk[0].unsafeAddr)
for i in 0..n: yield buf[i]
type
Operation* = enum Get, Put
Operations* = set[Operation]
ErisStore* = ref ErisStoreObj ## Object for interfacing ERIS storage.
ErisStoreObj* = object of RootObj
discard
using store: ErisStore
method id*(store): string {.base.} =
## Get an `id` for `store`. Should be unique within a running program.
"ErisStore@0x" & toHex(cast[ByteAddress](store[].unsafeAddr))
proc `$`*(store): string = store.id()
method get*(store; blk: FutureGet) {.base.} =
## Method for getting a chunk from a ``Store``.
## The result is not decrypted but should be verified.
blk.notFound("get not implemented for this ErisStore")
proc get*(store; `ref`: Reference; blk: FutureGet) =
assert `ref` == blk.`ref`, $blk.`ref`
assert blk.status != verified
get(store, blk)
proc getBlock(store: ErisStore, `ref`: Reference; bs: ChunkSize): Future[seq[byte]] =
var
fut = newFuture[seq[byte]]("eris.getBlock")
blk = newFutureGet(`ref`, bs)
blk.withFuture(fut) do:
complete(fut, blk.moveBytes)
get(store, blk)
fut
proc getBlock*(store: ErisStore, `ref`: Reference): Future[seq[byte]] {.async, deprecated.} =
## This requests a small chunk and with a fallback to a large chunk. Do not use it.
try:
result = await getBlock(store, `ref`, chunk1k)
except:
result = await getBlock(store, `ref`, chunk32k)
proc get*(store; pair: Pair; level: TreeLevel; bs: ChunkSize): Future[seq[byte]] =
var
fut = newFuture[seq[byte]]("eris.get")
blk = newFutureGet(pair.r, bs)
blk.withFuture(fut):
assertAtEnd blk
assertVerified blk
crypto(blk, pair.k, level)
complete(fut, move blk.buffer)
get(store, pair.r, blk)
fut
method put*(store; blk: FuturePut) {.base.} =
## Method for putting an encrypted chunk to a ``Store``.
fail(blk, newException(IOError, "put not implemented for this ErisStore"))
method hasBlock*(store; r: Reference; bs: ChunkSize): Future[bool] {.base.} =
## Test if `store` has a chunk for a `Reference`.
## For some stores this is cheaper than retrieving a chunk.
when defined(release):
raiseAssert "hasBlock not implemented for this store"
else:
var
fut = newFuture[bool]("hasBlock")
blk = newFutureGet(r, bs)
blk.addCallback do:
fut.complete(blk.status == verified)
get(store, r, blk)
fut
method close*(store) {.base.} = discard
## Method for closing a `Store`.
type DiscardStore* {.final.} = ref object of ErisStoreObj
method hasBlock(s: DiscardStore; r: Reference; bs: ChunkSize): Future[bool] =
result = newFuture[bool]("DiscardStore.hasBlock")
result.complete(false)
method put(s: DiscardStore; blk: FuturePut) =
complete(blk)
proc newDiscardStore*(): DiscardStore =
## Create an ``ErisStore`` that discards writes and fails to read.
new(result)
type
ErisStream* = ref ErisStreamObj ## An object representing data streams.
ErisStreamObj = object
store*: ErisStore
pos: BiggestUInt
leaves: seq[Pair]
cap: ErisCap
stopped: bool
futGet: FutureGet
proc buf(s: ErisStream): var seq[byte] {.inline.} = s.futGet.buffer
proc newErisStream*(store; cap): owned ErisStream =
## Open a new stream for reading ERIS data.
result = ErisStream(
store: store,
cap: cap)
proc close*(s: ErisStream) =
## Release the resources of an ``ErisStream``.
reset s.store
reset s.pos
reset s.leaves
proc cap*(s: ErisStream): ErisCap = s.cap
proc getLeaves(store: ErisStore; cap: ErisCap): Future[seq[Pair]] {.async.} =
if cap.level == 0: return @[cap.pair]
else:
var leaves = newSeqOfCap[Pair](((cap.chunkSize.arity ^ cap.level) div 4) * 3)
# TODO: this math is only a guess
proc expand(level: TreeLevel; pair: Pair) {.async.} =
# Expand on the stack
var blk = await get(store, pair, level, cap.chunkSize)
if level == 1:
for p in blk.chunkPairs:
leaves.add(p)
else:
for p in blk.chunkPairs:
await expand(level.pred, p)
await expand(cap.level, cap.pair)
return leaves
proc init(s: ErisStream) {.async.} =
s.futGet = newFutureGet(s.cap.chunkSize)
if s.leaves.len == 0:
s.leaves = await getLeaves(s.store, s.cap)
proc atEnd*(s: ErisStream): bool = s.stopped
## Check if an ``ErisStream`` is positioned at its end.
## May return false negatives.
proc setPosition*(s: ErisStream; pos: BiggestUInt) =
## Seek an ``ErisStream``.
assert pos >= 0
s.pos = pos
s.stopped = false
proc getPosition*(s: ErisStream): BiggestUInt =
## Return the position of an ``ErisStream``.
s.pos
proc loadBlock*(s: ErisStream, bNum: BiggestUInt): Future[void] =
assertIdle s.futGet
s.futGet.`ref` = s.leaves[bNum].r
result = s.futGet.asFuture:
assert s.futGet.callbacks.len == 0
assertVerified s.futGet
crypto(s.futGet, s.leaves[bNum].k, 0)
s.futGet.status = BlockStatus.unknown
get(s.store, s.leaves[bNum].r, s.futGet)
proc length*(s: ErisStream): Future[BiggestUInt] {.async.} =
## Estimate the length of an ``ErisStream``.
## The result is the length of ``s`` rounded up to the next chunk boundary.
await init(s)
var len = s.leaves.len.pred.BiggestUInt * s.cap.chunkSize
await loadBlock(s, s.leaves.high.BiggestUInt)
assertIdle s.futGet
assert s.futGet.status == plaintext
result = len + s.futGet.buffer.unpaddedLen.BiggestUInt
proc readBuffer*(s: ErisStream; buffer: pointer; bufLen: int): Future[int] {.async.} =
if s.leaves == @[]: await init(s)
var
bNum = s.pos div s.cap.chunkSize
buf = cast[ptr UncheckedArray[byte]](buffer)
bufOff = 0
while bufOff < bufLen and bNum < s.leaves.len.BiggestUInt:
await loadBlock(s, bNum)
let blkOff = s.cap.chunkSize.mask s.pos.int
var n = s.buf.len
if bNum == s.leaves.high.BiggestUInt:
n = s.buf.unpaddedLen
if s.buf.high < blkOff:
s.stopped = true
break
n = min(bufLen - bufOff, n - blkOff)
copyMem(unsafeAddr(buf[bufOff]), addr (s.buf[blkOff]), n)
inc(bNum)
inc(bufOff, n)
inc(s.pos, n)
return bufOff
proc read*(s: ErisStream; size: int): Future[seq[byte]] {.async.} =
var buf = newSeq[byte](size)
let n = await s.readBuffer(buf[0].addr, buf.len)
buf.setLen(n)
return buf
proc readLine*(s: ErisStream): Future[string] {.async.} =
# TODO: buffer a chunk.
if s.leaves == @[]: await init(s)
var
line = ""
bNum = s.pos div s.cap.chunkSize
line.setLen(0)
while true:
await loadBlock(s, bNum)
var
blkOff = s.cap.chunkSize.mask line.len
n = s.buf.len
if bNum == s.leaves.high.BiggestUInt:
n = s.buf.unpaddedLen
for i in blkOff..<n:
let c = s.buf[i].char
if c in Newlines:
return line
line.add(c)
inc(bNum)
if n < s.cap.chunkSize.int:
return line
proc readDataStr*(s: ErisStream; buffer: var string; slice: Slice[int]): Future[int] =
readBuffer(s, addr buffer[slice.a], slice.b + 1 - slice.a)
proc readAll*(s: ErisStream): Future[seq[byte]] {.async.} =
## Reads all data from the specified ``ErisStream``.
var
len = await s.length
buf = newSeq[byte](int(len - getPosition(s)))
let n = await readBuffer(s, addr buf[0], buf.len)
assert n == buf.len
result = buf
proc dump*(s: ErisStream; stream: Stream) {.async.} =
if s.leaves == @[]: await init(s)
var
bNum = s.pos div s.cap.chunkSize
bufOff = 0
while bNum < s.leaves.len.BiggestUInt:
await loadBlock(s, bNum)
var
blkOff = s.cap.chunkSize.mask s.pos.int
n = s.buf.len
if bNum == s.leaves.high.BiggestUInt:
n = s.buf.unpaddedLen
if s.buf.high < blkOff:
s.stopped = true
break
n.dec blkOff
writeData(stream, addr (s.buf[blkOff]), n)
inc(bNum)
inc(bufOff, n)
inc(s.pos, n)
proc decode*(store; cap): Future[seq[byte]] =
## Asynchronously decode ``cap`` from ``store``.
readAll(newErisStream(store, cap))
type
ErisIngest* = ref ErisIngestObj
ErisIngestObj = object
## An object for ingesting data into a store
store*: ErisStore
futPut: FuturePut
tree: seq[seq[Pair]]
secret*: Secret
pos: BiggestUInt
chunkSize: ChunkSize
invalid: bool
proc buffer(ingest: ErisIngest): var seq[byte] {.inline.} = ingest.futPut.buffer
proc newErisIngest*(store: ErisStore; chunkSize = chunk32k; secret: Secret): ErisIngest =
## Create a new `ErisIngest` object.
result = ErisIngest(
store: store,
futPut: newFuturePut(chunkSize),
tree: newSeqOfCap[seq[Pair]](8),
secret: secret,
chunkSize: chunkSize)
proc newErisIngest*(store: ErisStore; chunkSize = chunk32k; mode = uniqueMode): ErisIngest =
## Create a new `ErisIngest` object. If `mode` is `uniqueMode` then a random
## convergence secret will be generated using entropy from the operating system.
## For `convergentMode` a zero-secret will be used and the encoding will be
## deterministic and reproducible.
var secret: Secret
if mode == uniqueMode: doAssert urandom(secret.bytes)
newErisIngest(store, chunkSize, secret)
proc reinit*(ingest: ErisIngest) =
## Re-initialize an `ErisIngest` object.
for nodes in ingest.tree.mitems: nodes.setLen(0)
reset ingest.futPut.status
reset ingest.secret.bytes
ingest.pos = 0
ingest.invalid = false
proc reopen*(ingest: ErisIngest; cap: ErisCap) {.async.} =
## Re-open an `ErisIngest` for appending to an `ErisCap`.
var futGet = newFutureGet(cap.chunkSize)
ingest.chunkSize = cap.chunkSize
ingest.reinit()
ingest.tree.setLen(succ cap.level)
ingest.tree[cap.level].add(cap.pair)
if cap.level > 0:
for level in countdown(cap.level, TreeLevel 1):
var
pair = ingest.tree[level].pop
blk = await get(ingest.store, pair, level, cap.chunkSize)
for pair in blk.chunkPairs: ingest.tree[pred level].add(pair)
let pair = ingest.tree[0].pop
var
blk = await get(ingest.store, pair, cap.level, cap.chunkSize)
n = futGet.unpaddedLen
copyMem(addr ingest.buffer[0], addr blk[0], n)
ingest.pos = ingest.pos + n.BiggestUInt
proc reopenErisIngest*(store: ErisStore; cap: ErisCap; secret: Secret): Future[ErisIngest] {.async.} =
## Re-open a `ErisCap` for appending.
var ingest = newErisIngest(store, cap.chunkSize)
ingest.secret = secret
await reopen(ingest, cap)
return ingest
proc reopenErisIngest*(store: ErisStore; cap: ErisCap; mode = uniqueMode): Future[ErisIngest] =
## Re-open a `ErisCap` for appending.
var secret: Secret
if mode == uniqueMode: doAssert urandom(secret.bytes)
reopenErisIngest(store, cap, secret)
proc chunkSize*(ingest: ErisIngest): ChunkSize = ingest.chunkSize
proc position*(ingest: ErisIngest): BiggestUInt = ingest.pos
## Get the current append position of ``ingest``.
## This is same as the number of bytes appended.
proc commitLevel(ingest: ErisIngest; level: TreeLevel): Future[void] {.gcsafe.}
proc commitBuffer(ingest: ErisIngest; level: TreeLevel) {.async.} =
let pair =
if level == 0: encryptLeafFuture(ingest.secret, ingest.futPut)
else: encryptNodeFuture(level, ingest.futPut)
var f = asFuture(ingest.futPut)
put(ingest.store, ingest.futPut)
await f
ingest.futPut.status = plaintext
if ingest.tree.len == level.int:
ingest.tree.add(newSeqOfCap[Pair](ingest.chunkSize.arity))
ingest.tree[level].add(pair)
if ingest.tree[level].len == ingest.chunkSize.arity:
await commitLevel(ingest, level)
proc commitLevel(ingest: ErisIngest; level: TreeLevel): Future[void] =
var i: int
for pair in ingest.tree[level]:
copyMem(addr ingest.buffer[i+00], unsafeAddr pair.r.bytes[0], 32)
copyMem(addr ingest.buffer[i+32], unsafeAddr pair.k.bytes[0], 32)
inc(i, 64)
if i < ingest.chunkSize.int:
zeroMem(addr ingest.buffer[i], ingest.chunkSize.int - i)
ingest.tree[level].setLen(0)
commitBuffer(ingest, succ level)
proc append*(ingest: ErisIngest; data: string|seq[byte]) {.async.} =
## Ingest content.
doAssert(not ingest.invalid)
assertIdle ingest.futPut
var dataOff = 0
while dataOff < data.len:
let
blkOff = ingest.chunkSize.mask ingest.pos.int
n = min(data.len-dataOff, ingest.chunkSize.int-blkOff)
copyMem(ingest.buffer[blkOff].addr, data[dataOff].unsafeAddr, n)
ingest.pos.inc n
dataOff.inc n
if (ingest.chunkSize.mask ingest.pos.int) == 0:
await commitBuffer(ingest, 0)
proc append*(ingest: ErisIngest; stream: Stream) {.async.} =
## Ingest content from a `Stream`.
assertIdle ingest.futPut
while not stream.atEnd:
var
blkOff = ingest.chunkSize.mask ingest.pos.int
n = ingest.chunkSize.int - blkOff
n = readData(stream, ingest.buffer[blkOff].addr, n)
if n == 0: break
ingest.pos.inc n
if (ingest.chunkSize.mask ingest.pos.int) == 0:
await commitBuffer(ingest, 0)
proc padToNextBlock*(ingest: ErisIngest; pad = 0x80'u8): Future[void] =
## Pad the ingest stream with `0x80` until the start of the next chunk.
let chunkOff = ingest.chunkSize.mask ingest.pos.int
for i in chunkOff..<ingest.buffer.high: ingest.buffer[i] = pad
ingest.buffer[ingest.buffer.high] = 0x80
ingest.pos = ((ingest.pos div ingest.chunkSize) + 1) * ingest.chunkSize
commitBuffer(ingest, 0)
proc cap*(ingest: ErisIngest): Future[ErisCap] {.async.} =
## Derive the ``ErisCap`` of ``ingest``.
## The state of `ingest` is afterwards invalid until `reinit` or
## `reopen` is called.
assertIdle ingest.futPut
var cap = ErisCap(chunkSize: ingest.chunkSize)
let padOff = ingest.chunkSize.mask ingest.pos.int
ingest.buffer.setLen(padOff)
ingest.buffer.setLen(ingest.chunkSize.int) # zero the "fresh" buffer space
ingest.buffer[padOff] = 0x80
await commitBuffer(ingest, 0)
for level in 0..255:
if ingest.tree.high == level and ingest.tree[level].len == 1:
cap.pair = pop ingest.tree[level]
cap.level = uint8 level
break
else:
if ingest.tree.len > 0 and ingest.tree[level].len > 0:
await commitLevel(ingest, TreeLevel level)
ingest.invalid = true
return cap
proc encode*(store; chunkSize: ChunkSize; content: Stream; secret: Secret):
Future[(ErisCap, uint64)] {.async.} =
## Asychronously encode ``content`` into ``store`` and derive its ``ErisCap``.
let ingest = newErisIngest(store, chunkSize, secret)
await ingest.append(content)
let cap = await ingest.cap
return (cap, ingest.position)
proc encode*(store; chunkSize: ChunkSize; content: Stream; mode = uniqueMode): Future[(ErisCap, uint64)] =
## Asychronously encode ``content`` into ``store`` and derive its ``ErisCap``.
var secret: Secret
if mode == uniqueMode:
doAssert urandom(secret.bytes)
encode(store, chunkSize, content, secret)
proc encode*(store; content: Stream; mode = uniqueMode): Future[(ErisCap, uint64)] {.async.} =
## Asychronously encode ``content`` into ``store`` and derive its ``ErisCap``.
## The chunk size is 1KiB unless the content is at least 16KiB.
var
initialRead = content.readStr(chunk32k.int)
chunkSize = recommendedChunkSize initialRead.len
ingest = newErisIngest(store, chunkSize, mode)
await ingest.append(initialRead)
reset initialRead
await ingest.append(content)
let cap = await ingest.cap
return (cap, ingest.position)
proc encode*(store; chunkSize: ChunkSize; content: string; mode = uniqueMode): Future[(ErisCap, uint64)] =
## Asychronously encode ``content`` into ``store`` and derive its ``ErisCap``.
encode(store, chunkSize, newStringStream(content), mode)
proc erisCap*(content: string; chunkSize: ChunkSize): ErisCap =
## Derive a convergent ``ErisCap`` for ``content``.
runnableExamples:
assert:
$erisCap("Hello world!", chunk1k) ==
"urn:eris:BIAD77QDJMFAKZYH2DXBUZYAP3MXZ3DJZVFYQ5DFWC6T65WSFCU5S2IT4YZGJ7AC4SYQMP2DM2ANS2ZTCP3DJJIRV733CRAAHOSWIYZM3M"
var
store = newDiscardStore()
n: uint64
(result, n) = waitFor encode(store, chunkSize, newStringStream(content), convergentMode)
# DiscardStore will complete this immediately
type
Collector = ref object
store: ErisStore
set: HashSet[Reference]
chunkSize: ChunkSize
proc collect(col: Collector; pair: Pair; level: TreeLevel; getAll: bool) {.async.} =
assert level > 0
var futures: seq[Future[void]]
var blk = await get(col.store, pair, level, col.chunkSize)
for pair in blk.chunkPairs:
if pair.r notin col.set:
col.set.incl pair.r
if level > 1:
futures.add collect(col, pair, level.pred, getAll)
elif getAll:
var blk = newFutureGet(pair.r, col.chunkSize)
futures.add asFuture(blk)
get(col.store, pair.r, blk)
await all(futures)
proc references*(store: ErisStore; cap: ErisCap): Future[HashSet[Reference]] {.async.} =
## Collect the set of `Reference`s that constitute an `ErisCap`.
if cap.level == 0:
return [cap.pair.r].toHashSet
else:
var col = Collector(store: store, chunkSize: cap.chunkSize)
await collect(col, cap.pair, cap.level, false)
col.set.incl cap.pair.r
return col.set
proc getAll*(store: ErisStore; cap: ErisCap): Future[void] =
## Get all chunks that constitute `cap` from `store`.
## No data is returned, this procedure is for ensuring
## that all chunks are present at some store.
if cap.level == 0:
var blk = newFutureGet(cap.pair.r, cap.chunkSize)
result = asFuture(blk)
get(store, cap.pair.r, blk)
else:
var col = Collector(store: store, chunkSize: cap.chunkSize)
result = collect(col, cap.pair, cap.level, true)