@@ -518,6 +518,70 @@ func TestClientCloseUnsubscribeRace(t *testing.T) {
518518 }
519519}
520520
521+ // unsubscribeBlocker will wait for the quit channel to process an unsubscribe
522+ // request.
523+ type unsubscribeBlocker struct {
524+ ServerCodec
525+ quit chan struct {}
526+ }
527+
528+ func (b * unsubscribeBlocker ) readBatch () ([]* jsonrpcMessage , bool , error ) {
529+ msgs , batch , err := b .ServerCodec .readBatch ()
530+ for _ , msg := range msgs {
531+ if msg .isUnsubscribe () {
532+ <- b .quit
533+ }
534+ }
535+ return msgs , batch , err
536+ }
537+
538+ // TestUnsubscribeTimeout verifies that calling the client's Unsubscribe
539+ // function will eventually timeout and not block forever in case the serve does
540+ // not respond.
541+ // It reproducers the issue https://github.com/ethereum/go-ethereum/issues/30156
542+ func TestUnsubscribeTimeout (t * testing.T ) {
543+ srv := NewServer ()
544+ srv .RegisterName ("nftest" , new (notificationTestService ))
545+
546+ // Setup middleware to block on unsubscribe.
547+ p1 , p2 := net .Pipe ()
548+ blocker := & unsubscribeBlocker {ServerCodec : NewCodec (p1 ), quit : make (chan struct {})}
549+ defer close (blocker .quit )
550+
551+ // Serve the middleware.
552+ go srv .ServeCodec (blocker , OptionMethodInvocation | OptionSubscriptions )
553+ defer srv .Stop ()
554+
555+ // Create the client on the other end of the pipe.
556+ cfg := new (clientConfig )
557+ client , _ := newClient (context .Background (), cfg , func (context.Context ) (ServerCodec , error ) {
558+ return NewCodec (p2 ), nil
559+ })
560+ defer client .Close ()
561+
562+ // Start subscription.
563+ sub , err := client .Subscribe (context .Background (), "nftest" , make (chan int ), "someSubscription" , 1 , 1 )
564+ if err != nil {
565+ t .Fatalf ("failed to subscribe: %v" , err )
566+ }
567+
568+ // Now on a separate thread, attempt to unsubscribe. Since the middleware
569+ // won't return, the function will only return if it times out on the request.
570+ done := make (chan struct {})
571+ go func () {
572+ sub .Unsubscribe ()
573+ done <- struct {}{}
574+ }()
575+
576+ // Wait for the timeout. If the expected time for the timeout elapses, the
577+ // test is considered failed.
578+ select {
579+ case <- done :
580+ case <- time .After (unsubscribeTimeout + 3 * time .Second ):
581+ t .Fatalf ("Unsubscribe did not return within %s" , unsubscribeTimeout )
582+ }
583+ }
584+
521585// unsubscribeRecorder collects the subscription IDs of *_unsubscribe calls.
522586type unsubscribeRecorder struct {
523587 ServerCodec
0 commit comments