-
Notifications
You must be signed in to change notification settings - Fork 53
Raise tracking support for chronos procedures #298
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
473a33c
45d7ba8
896df6b
fde7649
8f765df
0c8d9c1
3f53d01
f8a55ae
1e8285b
b9c229c
6589cd2
8c36af8
c6075f8
69fcf74
72e7956
0f496db
79fa1ca
e12147c
e6faffd
673d03f
0f46a06
05e94c8
28ca5de
f93409a
b09d501
c3f8882
d91151a
b9a20be
9883fd2
20716d2
d5eed32
6c84b6f
e69d528
046722e
148107e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -11,6 +11,7 @@ | |||||
| import std/[os, tables, strutils, heapqueue, options, deques, sequtils] | ||||||
| import stew/base10 | ||||||
| import ./srcloc | ||||||
| import macros | ||||||
| export srcloc | ||||||
|
|
||||||
| when defined(nimHasStacktracesModule): | ||||||
|
|
@@ -60,6 +61,12 @@ type | |||||
| closure*: iterator(f: Future[T]): FutureBase {.raises: [Defect, CatchableError, Exception], gcsafe.} | ||||||
| value: T ## Stored value | ||||||
|
|
||||||
| RaiseTrackingFuture*[T, E] = ref object of Future[T] | ||||||
| ## Future with a tuple of possible exception types | ||||||
| ## eg RaiseTrackingFuture[void, (ValueError, OSError)] | ||||||
| ## Will be injected by `asyncraises`, should generally | ||||||
| ## not be used manually | ||||||
|
|
||||||
| FutureStr*[T] = ref object of Future[T] | ||||||
| ## Future to hold GC strings | ||||||
| gcholder*: string | ||||||
|
|
@@ -109,6 +116,9 @@ template setupFutureBase(loc: ptr SrcLoc) = | |||||
| proc newFutureImpl[T](loc: ptr SrcLoc): Future[T] = | ||||||
| setupFutureBase(loc) | ||||||
|
|
||||||
| proc newRaiseTrackingFutureImpl[T, E](loc: ptr SrcLoc): RaiseTrackingFuture[T, E] = | ||||||
| setupFutureBase(loc) | ||||||
|
|
||||||
| proc newFutureSeqImpl[A, B](loc: ptr SrcLoc): FutureSeq[A, B] = | ||||||
| setupFutureBase(loc) | ||||||
|
|
||||||
|
|
@@ -122,6 +132,19 @@ template newFuture*[T](fromProc: static[string] = ""): Future[T] = | |||||
| ## that this future belongs to, is a good habit as it helps with debugging. | ||||||
| newFutureImpl[T](getSrcLocation(fromProc)) | ||||||
|
|
||||||
| macro getFutureExceptions(T: typedesc): untyped = | ||||||
| if getTypeInst(T)[1].len > 2: | ||||||
| getTypeInst(T)[1][2] | ||||||
| else: | ||||||
| ident"void" | ||||||
|
|
||||||
| template newRaiseTrackingFuture*[T](fromProc: static[string] = ""): auto = | ||||||
| ## Creates a new future. | ||||||
| ## | ||||||
| ## Specifying ``fromProc``, which is a string specifying the name of the proc | ||||||
| ## that this future belongs to, is a good habit as it helps with debugging. | ||||||
| newRaiseTrackingFutureImpl[T, getFutureExceptions(typeof(result))](getSrcLocation(fromProc)) | ||||||
|
|
||||||
| template newFutureSeq*[A, B](fromProc: static[string] = ""): FutureSeq[A, B] = | ||||||
| ## Create a new future which can hold/preserve GC sequence until future will | ||||||
| ## not be completed. | ||||||
|
|
@@ -251,6 +274,49 @@ template fail*[T](future: Future[T], error: ref CatchableError) = | |||||
| ## Completes ``future`` with ``error``. | ||||||
| fail(future, error, getSrcLocation()) | ||||||
|
|
||||||
| macro checkFailureType(future, error: typed): untyped = | ||||||
| let e = getTypeInst(future)[2] | ||||||
| let types = getType(e) | ||||||
|
|
||||||
| if types.eqIdent("void"): | ||||||
| error("Can't raise exceptions on this Future") | ||||||
|
|
||||||
| expectKind(types, nnkBracketExpr) | ||||||
| expectKind(types[0], nnkSym) | ||||||
| assert types[0].strVal == "tuple" | ||||||
| assert types.len > 1 | ||||||
|
|
||||||
| expectKind(getTypeInst(error), nnkRefTy) | ||||||
| let toMatch = getTypeInst(error)[0] | ||||||
|
|
||||||
| # Can't find a way to check `is` in the macro. (sameType doesn't | ||||||
| # work for inherited objects). Dirty hack here, for [IOError, OSError], | ||||||
| # this will generate: | ||||||
| # | ||||||
| # static: | ||||||
| # if not((`toMatch` is IOError) or (`toMatch` is OSError) | ||||||
| # or (`toMatch` is CancelledError) or false): | ||||||
| # raiseAssert("Can't fail with `toMatch`, only [IOError, OSError] is allowed") | ||||||
| var typeChecker = ident"false" | ||||||
|
|
||||||
| for errorType in types[1..^1]: | ||||||
| typeChecker = newCall("or", typeChecker, newCall("is", toMatch, errorType)) | ||||||
| typeChecker = newCall( | ||||||
| "or", typeChecker, | ||||||
| newCall("is", toMatch, ident"CancelledError")) | ||||||
|
|
||||||
| let errorMsg = "Can't fail with " & repr(toMatch) & ". Only " & repr(types[1..^1]) & " allowed" | ||||||
|
|
||||||
| result = nnkStaticStmt.newNimNode(lineInfoFrom=error).add( | ||||||
| quote do: | ||||||
| if not(`typeChecker`): | ||||||
| raiseAssert(`errorMsg`) | ||||||
| ) | ||||||
|
|
||||||
| template fail*[T, E](future: RaiseTrackingFuture[T, E], error: ref CatchableError) = | ||||||
| checkFailureType(future, error) | ||||||
| fail(future, error, getSrcLocation()) | ||||||
|
|
||||||
| template newCancelledError(): ref CancelledError = | ||||||
| (ref CancelledError)(msg: "Future operation cancelled!") | ||||||
|
|
||||||
|
|
@@ -505,18 +571,65 @@ proc internalRead*[T](fut: Future[T]): T {.inline.} = | |||||
| when T isnot void: | ||||||
| return fut.value | ||||||
|
|
||||||
| proc read*[T](future: Future[T] ): T {. | ||||||
| raises: [Defect, CatchableError].} = | ||||||
| macro checkFutureExceptions*(f, typ: typed): untyped = | ||||||
| # For RaiseTrackingFuture[void, (ValueError, OSError), will do: | ||||||
| # if isNil(f.error): discard | ||||||
| # elif f.error of type ValueError: raise (ref ValueError)(f.error) | ||||||
| # elif f.error of type OSError: raise (ref OSError)(f.error) | ||||||
| # else: raiseAssert("Unhandled future exception: " & f.error.msg) | ||||||
| # | ||||||
| # In future nim versions, this could simply be | ||||||
| # {.cast(raises: [ValueError, OSError]).}: | ||||||
| # raise f.error | ||||||
| let e = getTypeInst(typ)[2] | ||||||
| let types = getType(e) | ||||||
|
|
||||||
| if types.eqIdent("void"): | ||||||
| return quote do: | ||||||
| if not(isNil(`f`.error)): | ||||||
| raiseAssert("Unhandled future exception: " & `f`.error.msg) | ||||||
|
|
||||||
| expectKind(types, nnkBracketExpr) | ||||||
| expectKind(types[0], nnkSym) | ||||||
| assert types[0].strVal == "tuple" | ||||||
| assert types.len > 1 | ||||||
|
|
||||||
| result = nnkIfExpr.newTree( | ||||||
| nnkElifExpr.newTree( | ||||||
| quote do: isNil(`f`.error), | ||||||
| quote do: discard | ||||||
| ) | ||||||
| ) | ||||||
|
|
||||||
| for errorType in types[1..^1]: | ||||||
| result.add nnkElifExpr.newTree( | ||||||
| quote do: `f`.error of type `errorType`, | ||||||
| nnkRaiseStmt.newNimNode(lineInfoFrom=typ).add( | ||||||
| quote do: (ref `errorType`)(`f`.error) | ||||||
| ) | ||||||
| ) | ||||||
|
|
||||||
| result.add nnkElseExpr.newTree( | ||||||
| quote do: raiseAssert("Unhandled future exception: " & `f`.error.msg) | ||||||
| ) | ||||||
|
|
||||||
| # we have to rely on inference here | ||||||
| {.pop.} | ||||||
| proc read*(future: Future | RaiseTrackingFuture ): auto = | ||||||
| ## Retrieves the value of ``future``. Future must be finished otherwise | ||||||
| ## this function will fail with a ``ValueError`` exception. | ||||||
| ## | ||||||
| ## If the result of the future is an error then that error will be raised. | ||||||
| if future.finished(): | ||||||
| internalCheckComplete(future) | ||||||
| when future is RaiseTrackingFuture: | ||||||
| checkFutureExceptions(future, future) | ||||||
| else: | ||||||
| internalCheckComplete(future) | ||||||
| internalRead(future) | ||||||
| else: | ||||||
| # TODO: Make a custom exception type for this? | ||||||
| raise newException(ValueError, "Future still in progress.") | ||||||
| {.push raises: [Defect].} | ||||||
|
|
||||||
| proc readError*[T](future: Future[T]): ref CatchableError {. | ||||||
| raises: [Defect, ValueError].} = | ||||||
|
|
@@ -686,6 +799,16 @@ proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] = | |||||
| retFuture.cancelCallback = cancellation | ||||||
| return retFuture | ||||||
|
|
||||||
| proc `or`*[T, E1, Y, E2](fut1: RaiseTrackingFuture[T, E1], fut2: RaiseTrackingFuture[Y, E2]): auto = | ||||||
| macro concatError(e1, e2: typed): untyped = | ||||||
| result = nnkTupleConstr.newTree() | ||||||
| for err in getTypeInst(e1)[1]: | ||||||
| result.add err | ||||||
| for err in getTypeInst(e2)[1]: | ||||||
| result.add err | ||||||
| #TODO is this really safe? | ||||||
| return cast[RaiseTrackingFuture[void, concatError(E1, E2)]](Future[T](fut1) or Future[Y](fut2)) | ||||||
|
|
||||||
| proc all*[T](futs: varargs[Future[T]]): auto {. | ||||||
| deprecated: "Use allFutures(varargs[Future[T]])".} = | ||||||
| ## Returns a future which will complete once all futures in ``futs`` complete. | ||||||
|
|
@@ -823,13 +946,13 @@ proc oneValue*[T](futs: varargs[Future[T]]): Future[T] {. | |||||
|
|
||||||
| return retFuture | ||||||
|
|
||||||
| proc cancelAndWait*[T](fut: Future[T]): Future[void] = | ||||||
| proc cancelAndWait*[T](fut: Future[T]): RaiseTrackingFuture[void, (CancelledError,)] = | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
the rewrite to RTF should be macro-handled, no?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't have access to macros here (asyncmacro2 requires asyncfutures2)
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe we should move this one elsewhere then - ie |
||||||
| ## Initiate cancellation process for Future ``fut`` and wait until ``fut`` is | ||||||
| ## done e.g. changes its state (become completed, failed or cancelled). | ||||||
| ## | ||||||
| ## If ``fut`` is already finished (completed, failed or cancelled) result | ||||||
| ## Future[void] object will be returned complete. | ||||||
| var retFuture = newFuture[void]("chronos.cancelAndWait(T)") | ||||||
| var retFuture = newRaiseTrackingFuture[void]("chronos.cancelAndWait(T)") | ||||||
| proc continuation(udata: pointer) = | ||||||
| if not(retFuture.finished()): | ||||||
| retFuture.complete() | ||||||
|
|
@@ -845,14 +968,14 @@ proc cancelAndWait*[T](fut: Future[T]): Future[void] = | |||||
| fut.cancel() | ||||||
| return retFuture | ||||||
|
|
||||||
| proc allFutures*[T](futs: varargs[Future[T]]): Future[void] = | ||||||
| proc allFutures*[T](futs: varargs[Future[T]]): RaiseTrackingFuture[void, (CancelledError,)] = | ||||||
| ## Returns a future which will complete only when all futures in ``futs`` | ||||||
| ## will be completed, failed or canceled. | ||||||
| ## | ||||||
| ## If the argument is empty, the returned future COMPLETES immediately. | ||||||
| ## | ||||||
| ## On cancel all the awaited futures ``futs`` WILL NOT BE cancelled. | ||||||
| var retFuture = newFuture[void]("chronos.allFutures()") | ||||||
| var retFuture = newRaiseTrackingFuture[void]("chronos.allFutures()") | ||||||
| let totalFutures = len(futs) | ||||||
| var completedFutures = 0 | ||||||
|
|
||||||
|
|
@@ -884,6 +1007,7 @@ proc allFutures*[T](futs: varargs[Future[T]]): Future[void] = | |||||
| return retFuture | ||||||
|
|
||||||
| proc allFinished*[T](futs: varargs[Future[T]]): Future[seq[Future[T]]] = | ||||||
| #TODO how to track exceptions here? | ||||||
| ## Returns a future which will complete only when all futures in ``futs`` | ||||||
| ## will be completed, failed or canceled. | ||||||
| ## | ||||||
|
|
@@ -924,6 +1048,7 @@ proc allFinished*[T](futs: varargs[Future[T]]): Future[seq[Future[T]]] = | |||||
| return retFuture | ||||||
|
|
||||||
| proc one*[T](futs: varargs[Future[T]]): Future[Future[T]] = | ||||||
| #TODO how to track exceptions here? | ||||||
| ## Returns a future which will complete and return completed Future[T] inside, | ||||||
| ## when one of the futures in ``futs`` will be completed, failed or canceled. | ||||||
| ## | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -901,10 +901,12 @@ when not(defined(windows)): | |
| retFuture.cancelCallback = cancellation | ||
| retFuture | ||
|
|
||
| proc sleepAsync*(duration: Duration): Future[void] = | ||
| include asyncmacro2 | ||
|
|
||
| proc sleepAsync*(duration: Duration): Future[void] {.asyncraises: [CancelledError].} = | ||
| ## Suspends the execution of the current async procedure for the next | ||
| ## ``duration`` time. | ||
| var retFuture = newFuture[void]("chronos.sleepAsync(Duration)") | ||
| var retFuture = newRaiseTrackingFuture[void]("chronos.sleepAsync(Duration)") | ||
| let moment = Moment.fromNow(duration) | ||
| var timer: TimerCallback | ||
|
|
||
|
|
@@ -924,7 +926,7 @@ proc sleepAsync*(ms: int): Future[void] {. | |
| inline, deprecated: "Use sleepAsync(Duration)".} = | ||
| result = sleepAsync(ms.milliseconds()) | ||
|
|
||
| proc stepsAsync*(number: int): Future[void] = | ||
| proc stepsAsync*(number: int): Future[void] {.asyncraises: [CancelledError].} = | ||
| ## Suspends the execution of the current async procedure for the next | ||
| ## ``number`` of asynchronous steps (``poll()`` calls). | ||
| ## | ||
|
|
@@ -934,7 +936,7 @@ proc stepsAsync*(number: int): Future[void] = | |
| ## WARNING! Do not use this primitive to perform switch between tasks, because | ||
| ## this can lead to 100% CPU load in the moments when there are no I/O | ||
| ## events. Usually when there no I/O events CPU consumption should be near 0%. | ||
| var retFuture = newFuture[void]("chronos.stepsAsync(int)") | ||
| var retFuture = newRaiseTrackingFuture[void]("chronos.stepsAsync(int)") | ||
| var counter = 0 | ||
|
|
||
| var continuation: proc(data: pointer) {.gcsafe, raises: [Defect].} | ||
|
|
@@ -957,12 +959,12 @@ proc stepsAsync*(number: int): Future[void] = | |
|
|
||
| retFuture | ||
|
|
||
| proc idleAsync*(): Future[void] = | ||
| proc idleAsync*(): Future[void] {.asyncraises: [CancelledError].} = | ||
| ## Suspends the execution of the current asynchronous task until "idle" time. | ||
| ## | ||
| ## "idle" time its moment of time, when no network events were processed by | ||
| ## ``poll()`` call. | ||
| var retFuture = newFuture[void]("chronos.idleAsync()") | ||
| var retFuture = newRaiseTrackingFuture[void]("chronos.idleAsync()") | ||
|
|
||
| proc continuation(data: pointer) {.gcsafe.} = | ||
| if not(retFuture.finished()): | ||
|
|
@@ -975,14 +977,14 @@ proc idleAsync*(): Future[void] = | |
| callIdle(continuation, nil) | ||
| retFuture | ||
|
|
||
| proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] = | ||
| proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] {.asyncraises: [CancelledError].} = | ||
| ## Returns a future which will complete once ``fut`` completes or after | ||
| ## ``timeout`` milliseconds has elapsed. | ||
| ## | ||
| ## If ``fut`` completes first the returned future will hold true, | ||
| ## otherwise, if ``timeout`` milliseconds has elapsed first, the returned | ||
| ## future will hold false. | ||
| var retFuture = newFuture[bool]("chronos.`withTimeout`") | ||
| var retFuture = newRaiseTrackingFuture[bool]("chronos.`withTimeout`") | ||
| var moment: Moment | ||
| var timer: TimerCallback | ||
| var cancelling = false | ||
|
|
@@ -1100,6 +1102,15 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] = | |
|
|
||
| return retFuture | ||
|
|
||
| proc wait*[T, E](fut: RaiseTrackingFuture[T, E], timeout = InfiniteDuration): auto = | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what's the return type here? in public functions, we ideally want to avoid auto both for documentation reasons and because it serves as a type inference guide for the compiler.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's RTF[T, E + CanceledError] (which I just realized is wrong, should be AsyncTimeoutError) I don't know a better solution than auto for this kind of procedures (but I don't think it has an impact on type inference) |
||
| macro concatError(e1: typed): untyped = | ||
| result = nnkTupleConstr.newTree() | ||
| result.add ident"CancelledError" | ||
| for err in getTypeInst(e1)[1]: | ||
| result.add err | ||
| #TODO is this really safe? | ||
| return cast[RaiseTrackingFuture[void, concatError(E)]](Future[T](fut).wait(timeout)) | ||
|
|
||
| proc wait*[T](fut: Future[T], timeout = -1): Future[T] {. | ||
| inline, deprecated: "Use wait(Future[T], Duration)".} = | ||
| if timeout == -1: | ||
|
|
@@ -1109,8 +1120,6 @@ proc wait*[T](fut: Future[T], timeout = -1): Future[T] {. | |
| else: | ||
| wait(fut, timeout.milliseconds()) | ||
|
|
||
| include asyncmacro2 | ||
|
|
||
| proc runForever*() {.raises: [Defect, CatchableError].} = | ||
| ## Begins a never ending global dispatcher poll loop. | ||
| ## Raises different exceptions depending on the platform. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just make a separate read function for RTF? the
autoreturn type here is unnecessarily wide as well, there's too many compromises for too little benefitThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, we can avoid it here by making a specialized read for RTF