You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
951 lines
31 KiB
951 lines
31 KiB
# SPDX-FileCopyrightText: 2022 Endo Renberg
|
|
# SPDX-License-Identifier: Unlicense
|
|
|
|
## This module provides the basic procedures and types for ERIS encoding
|
|
## and decoding. Encoding and decoding always requires an `ErisStore`
|
|
## receiver object and store operations are asynchronous.
|
|
##
|
|
## `ErisStore` objects are implemented in additional modules.
|
|
|
|
runnableExamples:
|
|
import eris/memory_stores
|
|
import std/[asyncdispatch, streams]
|
|
|
|
let
|
|
text = "Hello world!"
|
|
store = newMemoryStore()
|
|
(capA, _) = waitFor encode(store, newStringStream(text))
|
|
(capB, _) = waitFor encode(store, newStringStream(text), convergentMode)
|
|
(capC, _) = waitFor encode(store, newStringStream(text), convergentMode)
|
|
assert capA != capB
|
|
assert capB == capC
|
|
assert waitFor(decode(store, capA)) == waitFor(decode(store, capB))
|
|
assert waitFor(decode(store, capB)) == waitFor(decode(store, capC))
|
|
|
|
## An `ErisStore` is implemented by `get` and `put` methods. Both
|
|
## operate on a variant of the `FutureBlock` type which holds a buffer,
|
|
## parameters, and callbacks.
|
|
|
|
runnableExamples:
|
|
import eris/memory_stores
|
|
import std/[asyncdispatch, streams]
|
|
|
|
type LoggerStore {.final.} = ref object of ErisStore
|
|
other: ErisStore
|
|
|
|
proc newLogger(s: ErisStore): LoggerStore =
|
|
LoggerStore(other: s)
|
|
|
|
method get(logger: LoggerStore; fut: FutureGet) =
|
|
fut.addCallback:
|
|
if fut.failed:
|
|
stderr.writeLine("failed to get ", fut.chunkSize.int, " byte chunk ", fut.`ref`)
|
|
else:
|
|
stderr.writeLine("got ", fut.chunkSize.int, " byte chunk ", fut.`ref`)
|
|
get(logger.other, fut)
|
|
|
|
method put(logger: LoggerStore; fut: FuturePut) =
|
|
fut.addCallback:
|
|
if fut.failed:
|
|
stderr.writeLine("failed to put ", fut.chunkSize.int, " byte chunk ", fut.`ref`)
|
|
else:
|
|
stderr.writeLine("put ", fut.chunkSize.int, " byte chunk ", fut.`ref`)
|
|
put(logger.other, fut)
|
|
|
|
let
|
|
store = newMemoryStore()
|
|
logger = newLogger(store)
|
|
(cap, _) = waitFor encode(logger, newStringStream("Hail ERIS!"))
|
|
discard waitFor decode(logger, cap)
|
|
|
|
import std/[asyncdispatch, hashes, math, sets, streams, strutils, sysrand]
|
|
import base32, eris/private/[chacha20/src/chacha20, blake2/blake2]
|
|
|
|
const erisCborTag* = 276
|
|
## CBOR tag for ERIS binary read capabilities.
|
|
## https://www.iana.org/assignments/cbor-tags/cbor-tags.xhtml
|
|
|
|
type
|
|
Mode* = enum
|
|
## Type for specifying if an encoding shall be unique or convergent.
|
|
## See [section 2.3](https://eris.codeberg.page/spec/#section-2.3)
|
|
## for an explaination of encoding modes.
|
|
uniqueMode,
|
|
convergentMode,
|
|
ChunkSize* = enum
|
|
## Valid chunk sizes.
|
|
chunk1k = 1 shl 10 ## 1 KiB
|
|
chunk32k = 32 shl 10 ## 32 KiB
|
|
TreeLevel* = uint8
|
|
Reference* = object ## Reference to an encrypted chunk.
|
|
bytes*: array[32, byte]
|
|
Key* = object ## Key for decrypting a chunk.
|
|
bytes*: array[32, byte]
|
|
Secret* = object ## Secret for salting a `Key`.
|
|
bytes*: array[32, byte]
|
|
Pair* {.packed.} = object
|
|
r*: Reference
|
|
k*: Key
|
|
ErisCap* = object ## A capability for retrieving ERIS encoded data.
|
|
pair*: Pair
|
|
level*: uint8
|
|
chunkSize*: ChunkSize
|
|
|
|
using
|
|
key: Key
|
|
secret: Secret
|
|
pair: Pair
|
|
cap: ErisCap
|
|
|
|
assert(sizeOf(Pair) == 64)
|
|
|
|
func arity*(bs: ChunkSize): int = bs.int shr 6
|
|
|
|
func toByte*(bs: ChunkSize): uint8 =
|
|
case bs
|
|
of chunk1k: 0x0a'u8
|
|
of chunk32k: 0x0f'u8
|
|
|
|
func toChar*(bs: ChunkSize): char =
|
|
case bs
|
|
of chunk1k: 'A'
|
|
of chunk32k: 'F'
|
|
|
|
func mask(bs: ChunkSize; n: int): int = n and bs.int.pred
|
|
|
|
func `*`*[T: SomeUnsignedInt](x: T; bs: ChunkSize): T =
|
|
## Convenience function to multiply an integer by a `ChunkSize` value.
|
|
case bs
|
|
of chunk1k: x shl 0x0a
|
|
of chunk32k: x shl 0x0f
|
|
|
|
func `div`*[T: SomeUnsignedInt](x: T; bs: ChunkSize): T =
|
|
## Convenience function to divide an integer by a `ChunkSize` value.
|
|
case bs
|
|
of chunk1k: x shr 0x0a
|
|
of chunk32k: x shr 0x0f
|
|
|
|
func recommendedChunkSize*(dataLength: Natural): ChunkSize =
|
|
## Return the recommended `ChunkSize` for encoding data of the
|
|
## given length. The current implementation selects 1KiB chunks for
|
|
## lengths less than 16KiB otherwise 32KiB. The reasoning is that
|
|
## anything less 16KiB is encoded in a tree with a depth of no more
|
|
## than two chunk. A 16KiB chunk would waste nearly half of a 32KiB
|
|
## chunk but only requires a single chunk to be fetched, whereas
|
|
## 16KiB-1 rould require 17 chunk requests.
|
|
## The behavior of this function is not guaranted to remain constant and
|
|
## because of storage efficiency and latency tradeoffs may not yield
|
|
## the best choice for all applications.
|
|
if dataLength < (16 shl 10): chunk1k else: chunk32k
|
|
|
|
proc `$`*(x: Reference|Key|Secret): string =
|
|
## Encode to Base32.
|
|
base32.encode(cast[array[32, char]](x.bytes), pad = false)
|
|
|
|
proc `==`*(x, y: ErisCap): bool = x.pair.r.bytes == y.pair.r.bytes
|
|
|
|
proc hash*(r: Reference): Hash =
|
|
## Reduce a `Reference` to a `Hash` value.
|
|
copyMem(addr result, unsafeAddr r.bytes[0], sizeof(result))
|
|
result = !$result
|
|
|
|
proc hash*(cap): Hash {.inline.} = hash(cap.pair.r)
|
|
|
|
const chunkSizes = {chunk1k.int, chunk32k.int}
|
|
|
|
proc reference*[T: byte|char](data: openarray[T]): Reference =
|
|
## Derive the `Reference` for a 1KiB or 32KiB buffer.
|
|
assert(data.len in chunkSizes)
|
|
var ctx: Blake2b
|
|
ctx.init(32)
|
|
ctx.update(data)
|
|
ctx.final(result.bytes)
|
|
|
|
proc bytes*(cap): seq[byte] =
|
|
## Binary encoding of the read-capability.
|
|
result = newSeqOfCap[byte](1+1+32+32)
|
|
result.add cap.chunkSize.toByte
|
|
result.add cap.level
|
|
result.add cap.pair.r.bytes
|
|
result.add cap.pair.k.bytes
|
|
|
|
func toBase32*(cap): string =
|
|
base32.encode(cast[seq[char]](cap.bytes), pad = false)
|
|
|
|
proc `$`*(cap): string =
|
|
## Encode a ``ErisCap`` to standard URN form.
|
|
## https://inqlab.net/projects/eris/#_urn
|
|
"urn:eris:" & cap.toBase32
|
|
|
|
proc fromBase32*[T: Reference | Key | Secret](v: var T; s: string): bool =
|
|
try:
|
|
var buf = base32.decode(s)
|
|
if buf.len == v.bytes.len:
|
|
copyMem(v.bytes[0].addr, buf[0].addr, v.bytes.len)
|
|
result = true
|
|
except: discard
|
|
|
|
proc parseCap*[T: char|byte](bin: openArray[T]): ErisCap =
|
|
assert(bin.len == 66)
|
|
result.chunkSize =
|
|
case bin[0].byte
|
|
of chunk1k.toByte: chunk1k
|
|
of chunk32k.toByte: chunk32k
|
|
else: raise newException(ValueError, "invalid ERIS chunk size")
|
|
result.level = uint8 bin[1]
|
|
if result.level < 0 or 255 < result.level:
|
|
raise newException(ValueError, "invalid ERIS root level")
|
|
copyMem(addr result.pair.r.bytes[0], unsafeAddr bin[2], 32)
|
|
copyMem(addr result.pair.k.bytes[0], unsafeAddr bin[34], 32)
|
|
|
|
proc parseErisUrn*(urn: string): ErisCap =
|
|
## Decode a URN to a ``ErisCap``.
|
|
let parts = urn.split(':')
|
|
if 3 <= parts.len:
|
|
if parts[0] == "urn":
|
|
if parts[1] == "eris":
|
|
if parts[2].len >= 106:
|
|
let bin = base32.decode(parts[2][0..105])
|
|
return parseCap(bin)
|
|
raise newException(ValueError, "invalid ERIS URN encoding")
|
|
|
|
type
|
|
BlockStatus* = enum unknown, verified, plaintext
|
|
FutureBlock* = ref FutureBlockObj
|
|
FutureBlockObj = object of RootObj
|
|
`ref`: Reference
|
|
error*: ref Exception
|
|
buffer*: seq[byte]
|
|
# I considered making this an array but alignment
|
|
# of the final object on the heap would be wack
|
|
callbacks: seq[proc () {.closure, gcsafe.}]
|
|
chunkSize: ChunkSize
|
|
status: BlockStatus
|
|
|
|
FutureGet* = ref FutureGetObj
|
|
FutureGetObj {.final.} = object of FutureBlockObj
|
|
discard
|
|
|
|
FuturePut* = ref FuturePutObj
|
|
FuturePutObj {.final.} = object of FutureBlockObj
|
|
discard
|
|
|
|
proc assertAtEnd(blk: FutureBlock) =
|
|
doAssert blk.callbacks.len == 0
|
|
|
|
proc assertIdle(blk: FutureBlock) =
|
|
doAssert blk.callbacks.len == 0
|
|
doAssert blk.error.isNil
|
|
|
|
proc assertVerified*(blk: FutureBlock) =
|
|
doAssert blk.status == verified, $blk.`ref`
|
|
|
|
proc addCallback*(blk: FutureBlock; cb: proc () {.closure, gcsafe.}) =
|
|
## Add a callback to a `FutureBlock`. Callbacks are called last-in-first-out
|
|
## when `complete` is called on `blk`.
|
|
blk.callbacks.add cb
|
|
|
|
template withFuture[T](blk: FutureBlock; fut: Future[T]; body: untyped): untyped =
|
|
assertIdle blk
|
|
blk.addCallback do:
|
|
if blk.failed: fail(fut, blk.error)
|
|
else:
|
|
try:
|
|
body
|
|
when T is void:
|
|
complete(fut)
|
|
except: fail(fut, getCurrentException())
|
|
|
|
template asFuture*(blk: FutureBlock; body: untyped): untyped =
|
|
assertIdle blk
|
|
var fut = newFuture[void]("asFuture")
|
|
blk.addCallback do:
|
|
if not blk.error.isNil: fail(fut, blk.error)
|
|
else:
|
|
try:
|
|
body
|
|
complete(fut)
|
|
except: fail(fut, getCurrentException())
|
|
fut
|
|
|
|
template asFuture*(blk: FutureBlock): untyped =
|
|
assertIdle blk
|
|
var fut = newFuture[void]("asFuture")
|
|
blk.addCallback do:
|
|
if blk.failed: fail(fut, blk.error)
|
|
else: complete(fut)
|
|
fut
|
|
|
|
proc addCallback*(fut: Future; blk: FutureBlock; cb: proc () {.closure, gcsafe.}) =
|
|
fut.addCallback do:
|
|
if fut.failed: fail(blk, fut.error)
|
|
else:
|
|
try: cb()
|
|
except: fail(blk, getCurrentException())
|
|
|
|
proc newFutureGet*(bs: ChunkSize): FutureGet =
|
|
FutureGet(buffer: newSeq[byte](bs.int), chunkSize: bs)
|
|
|
|
proc newFutureGet*(`ref`: Reference; bs: ChunkSize): FutureGet =
|
|
FutureGet(`ref`: `ref`, buffer: newSeq[byte](bs.int), chunkSize: bs)
|
|
|
|
proc newFuturePut*(bs: ChunkSize): FuturePut =
|
|
FuturePut(buffer: newSeq[byte](bs.int), status: plaintext, chunkSize: bs)
|
|
|
|
proc newFuturePut*(`ref`: Reference; bs: ChunkSize): FuturePut =
|
|
FuturePut(`ref`: `ref`, buffer: newSeq[byte](bs.int), status: plaintext, chunkSize: bs)
|
|
|
|
proc newFuturePut*[T: byte|char](buffer: openarray[T]): FuturePut =
|
|
case buffer.len
|
|
of chunk1k.int:
|
|
result = FuturePut(`ref`: reference(buffer), buffer: newSeq[byte](buffer.len), chunkSize: chunk1k, status: verified)
|
|
of chunk32k.int:
|
|
result = FuturePut(`ref`: reference(buffer), buffer: newSeq[byte](buffer.len), chunkSize: chunk32k, status: verified)
|
|
else: raiseAssert "invalid buffer size"
|
|
copyMem(addr result.buffer[0], unsafeAddr buffer[0], result.buffer.len)
|
|
|
|
func failed*(blk: FutureBlock): bool {.inline.} = not blk.error.isNil
|
|
|
|
proc `ref`*(blk: FutureBlock): Reference {.inline.} = blk.`ref`
|
|
proc `ref=`*(blk: FutureBlock; r: Reference) {.inline.} = blk.`ref` = r
|
|
|
|
func chunkSize*(blk: FutureBlock): ChunkSize {.inline.} = blk.chunkSize
|
|
|
|
func buffer*(blk: FutureBlock): pointer {.inline.} =
|
|
assert blk.buffer.len == blk.chunkSize.int
|
|
unsafeAddr blk.buffer[0]
|
|
|
|
func verified*(blk: FutureBlock): bool {.inline.} = blk.status = verified
|
|
|
|
proc verify*(blk: FutureBlock): bool {.discardable.} =
|
|
## Verify that `blk` corresponds to `ref` and set the chunk error
|
|
## otherwise.
|
|
assert not blk.verified, "FutureBlock already verified or improperly initialized"
|
|
var
|
|
digest: Reference
|
|
ctx: Blake2b
|
|
ctx.init(32)
|
|
ctx.update(blk.buffer)
|
|
ctx.final(digest.bytes)
|
|
result = digest.bytes == blk.`ref`.bytes
|
|
if result: blk.status = verified
|
|
else: blk.error = newException(IOError, "ERIS chunk does not match reference")
|
|
|
|
proc verify*(blk: FutureBlock; `ref`: Reference): bool {.discardable, deprecated.}=
|
|
assert blk.`ref` == `ref`
|
|
verify(blk)
|
|
|
|
proc complete*(blk: FutureBlock) =
|
|
## Complete a `FutureBlock`.
|
|
assert blk.callbacks.len > 0
|
|
let cb = pop blk.callbacks
|
|
try: cb()
|
|
except Exception as e: blk.error = e
|
|
if blk.callbacks.len > 0: complete(blk)
|
|
|
|
proc complete*(blk: FutureGet; src: pointer; len: Natural; status = unknown) =
|
|
## Complete a `Get` `FutureBlock` with the chunk at `src`.
|
|
blk.status = status
|
|
assert len == blk.buffer.len
|
|
copyMem(addr blk.buffer[0], src, len)
|
|
if status != verified: verify(blk)
|
|
complete(blk)
|
|
|
|
proc complete*(blk: FutureGet; buf: sink seq[byte]; status = unknown) =
|
|
blk.status = status
|
|
doAssert buf.len == blk.buffer.len
|
|
blk.buffer = move buf
|
|
if status != verified: verify(blk)
|
|
complete(blk)
|
|
|
|
proc complete*[T: byte|char](blk: FutureGet; buf: openarray[T]; status = unknown) {.inline.} =
|
|
complete(blk, unsafeAddr buf[0], buf.len, status)
|
|
|
|
proc fail*(blk: FutureBlock; e: ref Exception) =
|
|
blk.error = e
|
|
complete(blk)
|
|
|
|
proc copy*(blk: FutureBlock; dst: pointer; len: Natural) =
|
|
## Copy chunk data out of a `FutureBlock`.
|
|
assertVerified blk
|
|
doAssert len <= blk.buffer.len
|
|
if blk.error.isNil: copyMem(dst, addr blk.buffer[0], len)
|
|
else: raise blk.error
|
|
|
|
proc notFound*(blk: FutureBlock; msg = "") {.inline.} =
|
|
## Fail `f` with a `KeyError` exception.
|
|
fail(blk, newException(KeyError, msg))
|
|
|
|
proc moveBytes*(blk: FutureBlock): owned seq[byte] =
|
|
## Move the `seq[byte]` out of a `FutureBlock`.
|
|
## This is only safe to use in the first callback added to `blk`
|
|
## because it will be called last.
|
|
assert(blk.buffer.len == blk.chunkSize.int)
|
|
move blk.buffer
|
|
|
|
proc toBytes*(blk: FutureBlock): owned seq[byte] =
|
|
result.setLen(blk.buffer.len)
|
|
copyMem(addr result[0], unsafeAddr blk.buffer[0], result.len)
|
|
|
|
proc crypto*(blk: FutureBlock; key; level: TreeLevel) =
|
|
var nonce: Nonce
|
|
nonce[0] = level.uint8
|
|
discard chacha20(key.bytes, nonce, 0, blk.buffer, blk.buffer)
|
|
case blk.status
|
|
of verified: blk.status = plaintext
|
|
of plaintext: blk.status = verified
|
|
else: raiseAssert "invalid chunk status"
|
|
|
|
proc encryptLeafFuture(secret; blk: FutureBlock): Pair =
|
|
assert blk.status == plaintext, $blk.status
|
|
var ctx: Blake2b
|
|
ctx.init(32, secret.bytes)
|
|
ctx.update(blk.buffer)
|
|
ctx.final(result.k.bytes)
|
|
crypto(blk, result.k, 0)
|
|
ctx.init(32)
|
|
ctx.update(blk.buffer)
|
|
ctx.final(blk.`ref`.bytes)
|
|
result.r = blk.`ref`
|
|
|
|
proc encryptNodeFuture(level: TreeLevel; blk: FutureBlock): Pair =
|
|
assert blk.status == plaintext
|
|
var ctx: Blake2b
|
|
ctx.init(32)
|
|
ctx.update(blk.buffer)
|
|
ctx.final(result.k.bytes)
|
|
crypto(blk, result.k, level)
|
|
ctx.init(32)
|
|
ctx.update(blk.buffer)
|
|
ctx.final(blk.`ref`.bytes)
|
|
result.r = blk.`ref`
|
|
|
|
proc unpaddedLen(buf: openarray[byte]): int {.inline.} =
|
|
result = buf.high
|
|
while result >= 0:
|
|
case buf[result]
|
|
of 0x00: discard
|
|
of 0x80:
|
|
return
|
|
else: break
|
|
dec result
|
|
raise newException(IOError, "invalid ERIS chunk padding")
|
|
|
|
proc unpaddedLen(blk: FutureBlock): int {.inline.} = blk.buffer.unpaddedLen
|
|
|
|
iterator chunkPairs*(blk: seq[byte]): Pair =
|
|
var n = blk.high
|
|
while blk[n] == 0x00: dec n # TODO: schnell machen
|
|
n = n shr 6
|
|
let buf = cast[ptr UncheckedArray[Pair]](blk[0].unsafeAddr)
|
|
for i in 0..n: yield buf[i]
|
|
|
|
type
|
|
Operation* = enum Get, Put
|
|
Operations* = set[Operation]
|
|
ErisStore* = ref ErisStoreObj ## Object for interfacing ERIS storage.
|
|
ErisStoreObj* = object of RootObj
|
|
discard
|
|
|
|
using store: ErisStore
|
|
|
|
method id*(store): string {.base.} =
|
|
## Get an `id` for `store`. Should be unique within a running program.
|
|
"ErisStore@0x" & toHex(cast[ByteAddress](store[].unsafeAddr))
|
|
|
|
proc `$`*(store): string = store.id()
|
|
|
|
method get*(store; blk: FutureGet) {.base.} =
|
|
## Method for getting a chunk from a ``Store``.
|
|
## The result is not decrypted but should be verified.
|
|
blk.notFound("get not implemented for this ErisStore")
|
|
|
|
proc get*(store; `ref`: Reference; blk: FutureGet) =
|
|
assert `ref` == blk.`ref`, $blk.`ref`
|
|
assert blk.status != verified
|
|
get(store, blk)
|
|
|
|
proc getBlock(store: ErisStore, `ref`: Reference; bs: ChunkSize): Future[seq[byte]] =
|
|
var
|
|
fut = newFuture[seq[byte]]("eris.getBlock")
|
|
blk = newFutureGet(`ref`, bs)
|
|
blk.withFuture(fut) do:
|
|
complete(fut, blk.moveBytes)
|
|
get(store, blk)
|
|
fut
|
|
|
|
proc getBlock*(store: ErisStore, `ref`: Reference): Future[seq[byte]] {.async, deprecated.} =
|
|
## This requests a small chunk and with a fallback to a large chunk. Do not use it.
|
|
try:
|
|
result = await getBlock(store, `ref`, chunk1k)
|
|
except:
|
|
result = await getBlock(store, `ref`, chunk32k)
|
|
|
|
proc get*(store; pair: Pair; level: TreeLevel; bs: ChunkSize): Future[seq[byte]] =
|
|
var
|
|
fut = newFuture[seq[byte]]("eris.get")
|
|
blk = newFutureGet(pair.r, bs)
|
|
blk.withFuture(fut):
|
|
assertAtEnd blk
|
|
assertVerified blk
|
|
crypto(blk, pair.k, level)
|
|
complete(fut, move blk.buffer)
|
|
get(store, pair.r, blk)
|
|
fut
|
|
|
|
method put*(store; blk: FuturePut) {.base.} =
|
|
## Method for putting an encrypted chunk to a ``Store``.
|
|
fail(blk, newException(IOError, "put not implemented for this ErisStore"))
|
|
|
|
method hasBlock*(store; r: Reference; bs: ChunkSize): Future[bool] {.base.} =
|
|
## Test if `store` has a chunk for a `Reference`.
|
|
## For some stores this is cheaper than retrieving a chunk.
|
|
when defined(release):
|
|
raiseAssert "hasBlock not implemented for this store"
|
|
else:
|
|
var
|
|
fut = newFuture[bool]("hasBlock")
|
|
blk = newFutureGet(r, bs)
|
|
blk.addCallback do:
|
|
fut.complete(blk.status == verified)
|
|
get(store, r, blk)
|
|
fut
|
|
|
|
method close*(store) {.base.} = discard
|
|
## Method for closing a `Store`.
|
|
|
|
type DiscardStore* {.final.} = ref object of ErisStoreObj
|
|
|
|
method hasBlock(s: DiscardStore; r: Reference; bs: ChunkSize): Future[bool] =
|
|
result = newFuture[bool]("DiscardStore.hasBlock")
|
|
result.complete(false)
|
|
|
|
method put(s: DiscardStore; blk: FuturePut) =
|
|
complete(blk)
|
|
|
|
proc newDiscardStore*(): DiscardStore =
|
|
## Create an ``ErisStore`` that discards writes and fails to read.
|
|
new(result)
|
|
|
|
type
|
|
ErisStream* = ref ErisStreamObj ## An object representing data streams.
|
|
ErisStreamObj = object
|
|
store*: ErisStore
|
|
pos: BiggestUInt
|
|
leaves: seq[Pair]
|
|
cap: ErisCap
|
|
stopped: bool
|
|
futGet: FutureGet
|
|
|
|
proc buf(s: ErisStream): var seq[byte] {.inline.} = s.futGet.buffer
|
|
|
|
proc newErisStream*(store; cap): owned ErisStream =
|
|
## Open a new stream for reading ERIS data.
|
|
result = ErisStream(
|
|
store: store,
|
|
cap: cap)
|
|
|
|
proc close*(s: ErisStream) =
|
|
## Release the resources of an ``ErisStream``.
|
|
reset s.store
|
|
reset s.pos
|
|
reset s.leaves
|
|
|
|
proc cap*(s: ErisStream): ErisCap = s.cap
|
|
|
|
proc getLeaves(store: ErisStore; cap: ErisCap): Future[seq[Pair]] {.async.} =
|
|
if cap.level == 0: return @[cap.pair]
|
|
else:
|
|
var leaves = newSeqOfCap[Pair](((cap.chunkSize.arity ^ cap.level) div 4) * 3)
|
|
# TODO: this math is only a guess
|
|
proc expand(level: TreeLevel; pair: Pair) {.async.} =
|
|
# Expand on the stack
|
|
var blk = await get(store, pair, level, cap.chunkSize)
|
|
if level == 1:
|
|
for p in blk.chunkPairs:
|
|
leaves.add(p)
|
|
else:
|
|
for p in blk.chunkPairs:
|
|
await expand(level.pred, p)
|
|
await expand(cap.level, cap.pair)
|
|
return leaves
|
|
|
|
proc init(s: ErisStream) {.async.} =
|
|
s.futGet = newFutureGet(s.cap.chunkSize)
|
|
if s.leaves.len == 0:
|
|
s.leaves = await getLeaves(s.store, s.cap)
|
|
|
|
proc atEnd*(s: ErisStream): bool = s.stopped
|
|
## Check if an ``ErisStream`` is positioned at its end.
|
|
## May return false negatives.
|
|
|
|
proc setPosition*(s: ErisStream; pos: BiggestUInt) =
|
|
## Seek an ``ErisStream``.
|
|
assert pos >= 0
|
|
s.pos = pos
|
|
s.stopped = false
|
|
|
|
proc getPosition*(s: ErisStream): BiggestUInt =
|
|
## Return the position of an ``ErisStream``.
|
|
s.pos
|
|
|
|
proc loadBlock*(s: ErisStream, bNum: BiggestUInt): Future[void] =
|
|
assertIdle s.futGet
|
|
s.futGet.`ref` = s.leaves[bNum].r
|
|
result = s.futGet.asFuture:
|
|
assert s.futGet.callbacks.len == 0
|
|
assertVerified s.futGet
|
|
crypto(s.futGet, s.leaves[bNum].k, 0)
|
|
s.futGet.status = BlockStatus.unknown
|
|
get(s.store, s.leaves[bNum].r, s.futGet)
|
|
|
|
proc length*(s: ErisStream): Future[BiggestUInt] {.async.} =
|
|
## Estimate the length of an ``ErisStream``.
|
|
## The result is the length of ``s`` rounded up to the next chunk boundary.
|
|
await init(s)
|
|
var len = s.leaves.len.pred.BiggestUInt * s.cap.chunkSize
|
|
await loadBlock(s, s.leaves.high.BiggestUInt)
|
|
assertIdle s.futGet
|
|
assert s.futGet.status == plaintext
|
|
result = len + s.futGet.buffer.unpaddedLen.BiggestUInt
|
|
|
|
proc readBuffer*(s: ErisStream; buffer: pointer; bufLen: int): Future[int] {.async.} =
|
|
if s.leaves == @[]: await init(s)
|
|
var
|
|
bNum = s.pos div s.cap.chunkSize
|
|
buf = cast[ptr UncheckedArray[byte]](buffer)
|
|
bufOff = 0
|
|
while bufOff < bufLen and bNum < s.leaves.len.BiggestUInt:
|
|
await loadBlock(s, bNum)
|
|
let blkOff = s.cap.chunkSize.mask s.pos.int
|
|
var n = s.buf.len
|
|
if bNum == s.leaves.high.BiggestUInt:
|
|
n = s.buf.unpaddedLen
|
|
if s.buf.high < blkOff:
|
|
s.stopped = true
|
|
break
|
|
n = min(bufLen - bufOff, n - blkOff)
|
|
copyMem(unsafeAddr(buf[bufOff]), addr (s.buf[blkOff]), n)
|
|
inc(bNum)
|
|
inc(bufOff, n)
|
|
inc(s.pos, n)
|
|
return bufOff
|
|
|
|
proc read*(s: ErisStream; size: int): Future[seq[byte]] {.async.} =
|
|
var buf = newSeq[byte](size)
|
|
let n = await s.readBuffer(buf[0].addr, buf.len)
|
|
buf.setLen(n)
|
|
return buf
|
|
|
|
proc readLine*(s: ErisStream): Future[string] {.async.} =
|
|
# TODO: buffer a chunk.
|
|
if s.leaves == @[]: await init(s)
|
|
var
|
|
line = ""
|
|
bNum = s.pos div s.cap.chunkSize
|
|
line.setLen(0)
|
|
while true:
|
|
await loadBlock(s, bNum)
|
|
var
|
|
blkOff = s.cap.chunkSize.mask line.len
|
|
n = s.buf.len
|
|
if bNum == s.leaves.high.BiggestUInt:
|
|
n = s.buf.unpaddedLen
|
|
for i in blkOff..<n:
|
|
let c = s.buf[i].char
|
|
if c in Newlines:
|
|
return line
|
|
line.add(c)
|
|
inc(bNum)
|
|
if n < s.cap.chunkSize.int:
|
|
return line
|
|
|
|
proc readDataStr*(s: ErisStream; buffer: var string; slice: Slice[int]): Future[int] =
|
|
readBuffer(s, addr buffer[slice.a], slice.b + 1 - slice.a)
|
|
|
|
proc readAll*(s: ErisStream): Future[seq[byte]] {.async.} =
|
|
## Reads all data from the specified ``ErisStream``.
|
|
var
|
|
len = await s.length
|
|
buf = newSeq[byte](int(len - getPosition(s)))
|
|
let n = await readBuffer(s, addr buf[0], buf.len)
|
|
assert n == buf.len
|
|
result = buf
|
|
|
|
proc dump*(s: ErisStream; stream: Stream) {.async.} =
|
|
if s.leaves == @[]: await init(s)
|
|
var
|
|
bNum = s.pos div s.cap.chunkSize
|
|
bufOff = 0
|
|
while bNum < s.leaves.len.BiggestUInt:
|
|
await loadBlock(s, bNum)
|
|
var
|
|
blkOff = s.cap.chunkSize.mask s.pos.int
|
|
n = s.buf.len
|
|
if bNum == s.leaves.high.BiggestUInt:
|
|
n = s.buf.unpaddedLen
|
|
if s.buf.high < blkOff:
|
|
s.stopped = true
|
|
break
|
|
n.dec blkOff
|
|
writeData(stream, addr (s.buf[blkOff]), n)
|
|
inc(bNum)
|
|
inc(bufOff, n)
|
|
inc(s.pos, n)
|
|
|
|
proc decode*(store; cap): Future[seq[byte]] =
|
|
## Asynchronously decode ``cap`` from ``store``.
|
|
readAll(newErisStream(store, cap))
|
|
|
|
type
|
|
ErisIngest* = ref ErisIngestObj
|
|
ErisIngestObj = object
|
|
## An object for ingesting data into a store
|
|
store*: ErisStore
|
|
futPut: FuturePut
|
|
tree: seq[seq[Pair]]
|
|
secret*: Secret
|
|
pos: BiggestUInt
|
|
chunkSize: ChunkSize
|
|
invalid: bool
|
|
|
|
proc buffer(ingest: ErisIngest): var seq[byte] {.inline.} = ingest.futPut.buffer
|
|
|
|
proc newErisIngest*(store: ErisStore; chunkSize = chunk32k; secret: Secret): ErisIngest =
|
|
## Create a new `ErisIngest` object.
|
|
result = ErisIngest(
|
|
store: store,
|
|
futPut: newFuturePut(chunkSize),
|
|
tree: newSeqOfCap[seq[Pair]](8),
|
|
secret: secret,
|
|
chunkSize: chunkSize)
|
|
|
|
proc newErisIngest*(store: ErisStore; chunkSize = chunk32k; mode = uniqueMode): ErisIngest =
|
|
## Create a new `ErisIngest` object. If `mode` is `uniqueMode` then a random
|
|
## convergence secret will be generated using entropy from the operating system.
|
|
## For `convergentMode` a zero-secret will be used and the encoding will be
|
|
## deterministic and reproducible.
|
|
var secret: Secret
|
|
if mode == uniqueMode: doAssert urandom(secret.bytes)
|
|
newErisIngest(store, chunkSize, secret)
|
|
|
|
proc reinit*(ingest: ErisIngest) =
|
|
## Re-initialize an `ErisIngest` object.
|
|
for nodes in ingest.tree.mitems: nodes.setLen(0)
|
|
reset ingest.futPut.status
|
|
reset ingest.secret.bytes
|
|
ingest.pos = 0
|
|
ingest.invalid = false
|
|
|
|
proc reopen*(ingest: ErisIngest; cap: ErisCap) {.async.} =
|
|
## Re-open an `ErisIngest` for appending to an `ErisCap`.
|
|
var futGet = newFutureGet(cap.chunkSize)
|
|
ingest.chunkSize = cap.chunkSize
|
|
ingest.reinit()
|
|
ingest.tree.setLen(succ cap.level)
|
|
ingest.tree[cap.level].add(cap.pair)
|
|
if cap.level > 0:
|
|
for level in countdown(cap.level, TreeLevel 1):
|
|
var
|
|
pair = ingest.tree[level].pop
|
|
blk = await get(ingest.store, pair, level, cap.chunkSize)
|
|
for pair in blk.chunkPairs: ingest.tree[pred level].add(pair)
|
|
let pair = ingest.tree[0].pop
|
|
var
|
|
blk = await get(ingest.store, pair, cap.level, cap.chunkSize)
|
|
n = futGet.unpaddedLen
|
|
copyMem(addr ingest.buffer[0], addr blk[0], n)
|
|
ingest.pos = ingest.pos + n.BiggestUInt
|
|
|
|
proc reopenErisIngest*(store: ErisStore; cap: ErisCap; secret: Secret): Future[ErisIngest] {.async.} =
|
|
## Re-open a `ErisCap` for appending.
|
|
var ingest = newErisIngest(store, cap.chunkSize)
|
|
ingest.secret = secret
|
|
await reopen(ingest, cap)
|
|
return ingest
|
|
|
|
proc reopenErisIngest*(store: ErisStore; cap: ErisCap; mode = uniqueMode): Future[ErisIngest] =
|
|
## Re-open a `ErisCap` for appending.
|
|
var secret: Secret
|
|
if mode == uniqueMode: doAssert urandom(secret.bytes)
|
|
reopenErisIngest(store, cap, secret)
|
|
|
|
proc chunkSize*(ingest: ErisIngest): ChunkSize = ingest.chunkSize
|
|
|
|
proc position*(ingest: ErisIngest): BiggestUInt = ingest.pos
|
|
## Get the current append position of ``ingest``.
|
|
## This is same as the number of bytes appended.
|
|
|
|
proc commitLevel(ingest: ErisIngest; level: TreeLevel): Future[void] {.gcsafe.}
|
|
|
|
proc commitBuffer(ingest: ErisIngest; level: TreeLevel) {.async.} =
|
|
let pair =
|
|
if level == 0: encryptLeafFuture(ingest.secret, ingest.futPut)
|
|
else: encryptNodeFuture(level, ingest.futPut)
|
|
var f = asFuture(ingest.futPut)
|
|
put(ingest.store, ingest.futPut)
|
|
await f
|
|
ingest.futPut.status = plaintext
|
|
if ingest.tree.len == level.int:
|
|
ingest.tree.add(newSeqOfCap[Pair](ingest.chunkSize.arity))
|
|
ingest.tree[level].add(pair)
|
|
if ingest.tree[level].len == ingest.chunkSize.arity:
|
|
await commitLevel(ingest, level)
|
|
|
|
proc commitLevel(ingest: ErisIngest; level: TreeLevel): Future[void] =
|
|
var i: int
|
|
for pair in ingest.tree[level]:
|
|
copyMem(addr ingest.buffer[i+00], unsafeAddr pair.r.bytes[0], 32)
|
|
copyMem(addr ingest.buffer[i+32], unsafeAddr pair.k.bytes[0], 32)
|
|
inc(i, 64)
|
|
if i < ingest.chunkSize.int:
|
|
zeroMem(addr ingest.buffer[i], ingest.chunkSize.int - i)
|
|
ingest.tree[level].setLen(0)
|
|
commitBuffer(ingest, succ level)
|
|
|
|
proc append*(ingest: ErisIngest; data: string|seq[byte]) {.async.} =
|
|
## Ingest content.
|
|
doAssert(not ingest.invalid)
|
|
assertIdle ingest.futPut
|
|
var dataOff = 0
|
|
while dataOff < data.len:
|
|
let
|
|
blkOff = ingest.chunkSize.mask ingest.pos.int
|
|
n = min(data.len-dataOff, ingest.chunkSize.int-blkOff)
|
|
copyMem(ingest.buffer[blkOff].addr, data[dataOff].unsafeAddr, n)
|
|
ingest.pos.inc n
|
|
dataOff.inc n
|
|
if (ingest.chunkSize.mask ingest.pos.int) == 0:
|
|
await commitBuffer(ingest, 0)
|
|
|
|
proc append*(ingest: ErisIngest; stream: Stream) {.async.} =
|
|
## Ingest content from a `Stream`.
|
|
assertIdle ingest.futPut
|
|
while not stream.atEnd:
|
|
var
|
|
blkOff = ingest.chunkSize.mask ingest.pos.int
|
|
n = ingest.chunkSize.int - blkOff
|
|
n = readData(stream, ingest.buffer[blkOff].addr, n)
|
|
if n == 0: break
|
|
ingest.pos.inc n
|
|
if (ingest.chunkSize.mask ingest.pos.int) == 0:
|
|
await commitBuffer(ingest, 0)
|
|
|
|
proc padToNextBlock*(ingest: ErisIngest; pad = 0x80'u8): Future[void] =
|
|
## Pad the ingest stream with `0x80` until the start of the next chunk.
|
|
let chunkOff = ingest.chunkSize.mask ingest.pos.int
|
|
for i in chunkOff..<ingest.buffer.high: ingest.buffer[i] = pad
|
|
ingest.buffer[ingest.buffer.high] = 0x80
|
|
ingest.pos = ((ingest.pos div ingest.chunkSize) + 1) * ingest.chunkSize
|
|
commitBuffer(ingest, 0)
|
|
|
|
proc cap*(ingest: ErisIngest): Future[ErisCap] {.async.} =
|
|
## Derive the ``ErisCap`` of ``ingest``.
|
|
## The state of `ingest` is afterwards invalid until `reinit` or
|
|
## `reopen` is called.
|
|
assertIdle ingest.futPut
|
|
var cap = ErisCap(chunkSize: ingest.chunkSize)
|
|
let padOff = ingest.chunkSize.mask ingest.pos.int
|
|
ingest.buffer.setLen(padOff)
|
|
ingest.buffer.setLen(ingest.chunkSize.int) # zero the "fresh" buffer space
|
|
ingest.buffer[padOff] = 0x80
|
|
await commitBuffer(ingest, 0)
|
|
for level in 0..255:
|
|
if ingest.tree.high == level and ingest.tree[level].len == 1:
|
|
cap.pair = pop ingest.tree[level]
|
|
cap.level = uint8 level
|
|
break
|
|
else:
|
|
if ingest.tree.len > 0 and ingest.tree[level].len > 0:
|
|
await commitLevel(ingest, TreeLevel level)
|
|
ingest.invalid = true
|
|
return cap
|
|
|
|
proc encode*(store; chunkSize: ChunkSize; content: Stream; secret: Secret):
|
|
Future[(ErisCap, uint64)] {.async.} =
|
|
## Asychronously encode ``content`` into ``store`` and derive its ``ErisCap``.
|
|
let ingest = newErisIngest(store, chunkSize, secret)
|
|
await ingest.append(content)
|
|
let cap = await ingest.cap
|
|
return (cap, ingest.position)
|
|
|
|
proc encode*(store; chunkSize: ChunkSize; content: Stream; mode = uniqueMode): Future[(ErisCap, uint64)] =
|
|
## Asychronously encode ``content`` into ``store`` and derive its ``ErisCap``.
|
|
var secret: Secret
|
|
if mode == uniqueMode:
|
|
doAssert urandom(secret.bytes)
|
|
encode(store, chunkSize, content, secret)
|
|
|
|
proc encode*(store; content: Stream; mode = uniqueMode): Future[(ErisCap, uint64)] {.async.} =
|
|
## Asychronously encode ``content`` into ``store`` and derive its ``ErisCap``.
|
|
## The chunk size is 1KiB unless the content is at least 16KiB.
|
|
var
|
|
initialRead = content.readStr(chunk32k.int)
|
|
chunkSize = recommendedChunkSize initialRead.len
|
|
ingest = newErisIngest(store, chunkSize, mode)
|
|
await ingest.append(initialRead)
|
|
reset initialRead
|
|
await ingest.append(content)
|
|
let cap = await ingest.cap
|
|
return (cap, ingest.position)
|
|
|
|
proc encode*(store; chunkSize: ChunkSize; content: string; mode = uniqueMode): Future[(ErisCap, uint64)] =
|
|
## Asychronously encode ``content`` into ``store`` and derive its ``ErisCap``.
|
|
encode(store, chunkSize, newStringStream(content), mode)
|
|
|
|
proc erisCap*(content: string; chunkSize: ChunkSize): ErisCap =
|
|
## Derive a convergent ``ErisCap`` for ``content``.
|
|
runnableExamples:
|
|
assert:
|
|
$erisCap("Hello world!", chunk1k) ==
|
|
"urn:eris:BIAD77QDJMFAKZYH2DXBUZYAP3MXZ3DJZVFYQ5DFWC6T65WSFCU5S2IT4YZGJ7AC4SYQMP2DM2ANS2ZTCP3DJJIRV733CRAAHOSWIYZM3M"
|
|
|
|
var
|
|
store = newDiscardStore()
|
|
n: uint64
|
|
(result, n) = waitFor encode(store, chunkSize, newStringStream(content), convergentMode)
|
|
# DiscardStore will complete this immediately
|
|
|
|
type
|
|
Collector = ref object
|
|
store: ErisStore
|
|
set: HashSet[Reference]
|
|
chunkSize: ChunkSize
|
|
|
|
proc collect(col: Collector; pair: Pair; level: TreeLevel; getAll: bool) {.async.} =
|
|
assert level > 0
|
|
var futures: seq[Future[void]]
|
|
var blk = await get(col.store, pair, level, col.chunkSize)
|
|
for pair in blk.chunkPairs:
|
|
if pair.r notin col.set:
|
|
col.set.incl pair.r
|
|
if level > 1:
|
|
futures.add collect(col, pair, level.pred, getAll)
|
|
elif getAll:
|
|
var blk = newFutureGet(pair.r, col.chunkSize)
|
|
futures.add asFuture(blk)
|
|
get(col.store, pair.r, blk)
|
|
await all(futures)
|
|
|
|
proc references*(store: ErisStore; cap: ErisCap): Future[HashSet[Reference]] {.async.} =
|
|
## Collect the set of `Reference`s that constitute an `ErisCap`.
|
|
if cap.level == 0:
|
|
return [cap.pair.r].toHashSet
|
|
else:
|
|
var col = Collector(store: store, chunkSize: cap.chunkSize)
|
|
await collect(col, cap.pair, cap.level, false)
|
|
col.set.incl cap.pair.r
|
|
return col.set
|
|
|
|
proc getAll*(store: ErisStore; cap: ErisCap): Future[void] =
|
|
## Get all chunks that constitute `cap` from `store`.
|
|
## No data is returned, this procedure is for ensuring
|
|
## that all chunks are present at some store.
|
|
if cap.level == 0:
|
|
var blk = newFutureGet(cap.pair.r, cap.chunkSize)
|
|
result = asFuture(blk)
|
|
get(store, cap.pair.r, blk)
|
|
else:
|
|
var col = Collector(store: store, chunkSize: cap.chunkSize)
|
|
result = collect(col, cap.pair, cap.level, true)
|