Skip to content

Commit 69c9fca

Browse files
committed
Update
1 parent 44972ab commit 69c9fca

File tree

6 files changed

+53
-121
lines changed

6 files changed

+53
-121
lines changed

Sources/Verge/Library/EventEmitter.swift

Lines changed: 33 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121

2222
import Atomics
2323
import Combine
24+
import DequeModule
2425
import Foundation
2526
import os
26-
import DequeModule
2727

2828
public final class EventEmitterCancellable: Hashable, Cancellable, @unchecked Sendable {
2929

@@ -57,11 +57,12 @@ public protocol EventEmitterEventType {
5757
/// Instead of Combine
5858
open class EventEmitter<Event: EventEmitterEventType>: EventEmitterType, @unchecked Sendable {
5959

60-
public var publisher: some Publisher<Event, Never> {
61-
self
60+
public var publisher: Publisher {
61+
return .init(eventEmitter: self)
6262
}
6363

64-
private var subscribers: VergeConcurrency.UnfairLockAtomic<[EventEmitterCancellable : (Event) -> Void]> = .init([:])
64+
private var subscribers:
65+
VergeConcurrency.UnfairLockAtomic<[EventEmitterCancellable: (Event) -> Void]> = .init([:])
6566

6667
private let queue: VergeConcurrency.UnfairLockAtomic<Deque<Event>> = .init(.init())
6768

@@ -70,7 +71,6 @@ open class EventEmitter<Event: EventEmitterEventType>: EventEmitterType, @unchec
7071
private var deinitHandlers: VergeConcurrency.UnfairLockAtomic<[() -> Void]> = .init([])
7172

7273
public init() {
73-
7474
}
7575

7676
deinit {
@@ -106,10 +106,10 @@ open class EventEmitter<Event: EventEmitterEventType>: EventEmitterType, @unchec
106106
return nil
107107
}
108108
}) {
109-
109+
110110
// Emits
111111
receiveEvent(event)
112-
112+
113113
for subscriber in capturedSubscribers {
114114
vergeSignpostEvent("EventEmitter.emitForSubscriber")
115115
subscriber.1(event)
@@ -128,14 +128,15 @@ open class EventEmitter<Event: EventEmitterEventType>: EventEmitterType, @unchec
128128
}
129129

130130
}
131-
131+
132132
open func receiveEvent(_ event: consuming Event) {
133133

134134
}
135135

136136
@_spi(EventEmitter)
137137
@discardableResult
138-
public func addEventHandler(_ eventReceiver: @escaping (Event) -> Void) -> EventEmitterCancellable {
138+
public func addEventHandler(_ eventReceiver: @escaping (Event) -> Void) -> EventEmitterCancellable
139+
{
139140
let token = EventEmitterCancellable(owner: self)
140141
subscribers.modify {
141142
$0[token] = eventReceiver
@@ -157,7 +158,7 @@ open class EventEmitter<Event: EventEmitterEventType>: EventEmitterType, @unchec
157158
// then unfair-lock raises runtime error.
158159
withExtendedLifetime(itemToRemove, {})
159160
}
160-
161+
161162
public func onDeinit(_ onDeinit: @escaping () -> Void) {
162163
deinitHandlers.modify {
163164
$0.append(onDeinit)
@@ -166,28 +167,29 @@ open class EventEmitter<Event: EventEmitterEventType>: EventEmitterType, @unchec
166167

167168
}
168169

169-
extension EventEmitter: Publisher {
170-
171-
public typealias Output = Event
170+
extension EventEmitter {
172171

173-
public typealias Failure = Never
172+
@available(iOS 13, macOS 10.15, tvOS 13, watchOS 6, *)
173+
public struct Publisher: Combine.Publisher {
174174

175-
public func receive<S>(
176-
subscriber: S
177-
)
178-
where S: Subscriber, Failure == S.Failure, Output == S.Input {
175+
public typealias Output = Event
179176

180-
let subscription = Subscription<S>(
181-
subscriber: subscriber,
182-
eventEmitter: self
183-
)
184-
185-
subscriber.receive(subscription: subscription)
186-
}
187-
188-
}
177+
public typealias Failure = Never
189178

190-
extension EventEmitter {
179+
private weak var eventEmitter: EventEmitter<Event>?
180+
181+
public init(eventEmitter: EventEmitter<Event>) {
182+
self.eventEmitter = eventEmitter
183+
}
184+
185+
public func receive<S>(subscriber: S)
186+
where S: Subscriber, Failure == S.Failure, Output == S.Input {
187+
188+
let subscription = Subscription<S>(subscriber: subscriber, eventEmitter: eventEmitter)
189+
subscriber.receive(subscription: subscription)
190+
}
191+
192+
}
191193

192194
@available(iOS 13, macOS 10.15, tvOS 13, watchOS 6, *)
193195
public struct Subscription<S: Subscriber>: Combine.Subscription where S.Input == Event {
@@ -198,22 +200,15 @@ extension EventEmitter {
198200
private let eventEmitterSubscription: EventEmitterCancellable?
199201
private weak var eventEmitter: EventEmitter<Event>?
200202

201-
init(
202-
subscriber: S,
203-
eventEmitter: EventEmitter<Event>?
204-
) {
203+
init(subscriber: S, eventEmitter: EventEmitter<Event>?) {
205204

206205
self.subscriber = subscriber
207206
self.eventEmitter = eventEmitter
208-
209-
eventEmitter?.onDeinit {
210-
subscriber.receive(completion: .finished)
211-
}
212-
207+
213208
self.eventEmitterSubscription = eventEmitter?
214209
.addEventHandler { (event) in
215210
_ = subscriber.receive(event)
216-
}
211+
}
217212
}
218213

219214
public func request(_ demand: Subscribers.Demand) {

Sources/Verge/Store/Store+Combine.swift

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
2020
// THE SOFTWARE.
2121

22-
#if canImport(Combine)
23-
2422
import Combine
2523

2624
@available(iOS 13, macOS 10.15, tvOS 13, watchOS 6, *)
@@ -35,16 +33,25 @@ extension Store {
3533
@_spi(Package)
3634
public func _statePublisher() -> some Combine.Publisher<Changes<Value>, Never> {
3735

38-
return valuePublisher
39-
.dropFirst()
36+
return
37+
publisher
4038
.associate(resource: self, retains: keepsAliveForSubscribers)
39+
.flatMap { event in
40+
guard case .state(.didUpdate(let state)) = event else {
41+
return Empty<Changes<Value>, Never>().eraseToAnyPublisher()
42+
}
43+
return Just<Changes<Value>>(state)
44+
.eraseToAnyPublisher()
45+
}
4146
.merge(with: Just(state.droppedPrevious()))
47+
4248
}
4349

44-
// @_spi(Package)
50+
// @_spi(Package)
4551
public func _activityPublisher() -> some Combine.Publisher<Activity, Never> {
4652

47-
return publisher
53+
return
54+
publisher
4855
.associate(resource: self, retains: keepsAliveForSubscribers)
4956
.flatMap { event in
5057
guard case .activity(let a) = event else {
@@ -70,7 +77,7 @@ extension Publisher {
7077

7178
}
7279

73-
fileprivate final class ResourceBox {
80+
private final class ResourceBox {
7481

7582
private let object: AnyObject?
7683

@@ -82,5 +89,3 @@ fileprivate final class ResourceBox {
8289
}
8390
}
8491
}
85-
86-
#endif

Sources/Verge/Store/Store.swift

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -125,10 +125,6 @@ open class Store<State, Activity: Sendable>: EventEmitter<_StoreEvent<State, Act
125125

126126
public let sanitizer: RuntimeSanitizer
127127

128-
public var valuePublisher: some Combine.Publisher<Changes<State>, Never> {
129-
return _valueSubject
130-
}
131-
132128
private var middlewares: [AnyStoreMiddleware<State>] = []
133129

134130
private let externalOperation: @Sendable (inout InoutRef<State>, Changes<State>, inout Transaction) -> Void
@@ -137,8 +133,6 @@ open class Store<State, Activity: Sendable>: EventEmitter<_StoreEvent<State, Act
137133

138134
private let _lock: StoreOperation
139135

140-
private let _valueSubject: CurrentValueSubject<Changes<State>, Never>
141-
142136
/**
143137
Holds subscriptions for sink State and Activity to finish them with its store life-cycle.
144138
*/
@@ -180,9 +174,6 @@ open class Store<State, Activity: Sendable>: EventEmitter<_StoreEvent<State, Act
180174
self.nonatomicValue = .init(old: nil, new: initialState)
181175
self._lock = storeOperation
182176

183-
// TODO: copying value
184-
self._valueSubject = .init(nonatomicValue)
185-
186177
self.logger = logger
187178
self.sanitizer = sanitizer ?? RuntimeSanitizer.global
188179
self.name = name ?? "\(file):\(line)"
@@ -215,8 +206,6 @@ open class Store<State, Activity: Sendable>: EventEmitter<_StoreEvent<State, Act
215206

216207
self.nonatomicValue = .init(old: nil, new: reduced)
217208
self._lock = storeOperation
218-
// TODO: copying value
219-
self._valueSubject = .init(nonatomicValue)
220209

221210
self.logger = logger
222211
self.sanitizer = sanitizer ?? RuntimeSanitizer.global
@@ -249,7 +238,6 @@ open class Store<State, Activity: Sendable>: EventEmitter<_StoreEvent<State, Act
249238
case .willUpdate:
250239
break
251240
case .didUpdate(let state):
252-
_valueSubject.send(state)
253241
stateDidUpdate(newState: state)
254242
}
255243
case .activity:
@@ -277,11 +265,7 @@ open class Store<State, Activity: Sendable>: EventEmitter<_StoreEvent<State, Act
277265

278266
storeLifeCycleCancellable.cancel()
279267

280-
Task { [taskManager, _valueSubject] in
281-
// send completion in hop as Combine is using unfair lock (non-recursive). Avoid crash.
282-
// It happens if the stream ratains this store, canceled that stream triggers this deinit operation.
283-
// that deinit operation will be inside of locking session.
284-
_valueSubject.send(completion: .finished)
268+
Task { [taskManager] in
285269
await taskManager.cancelAll()
286270
}
287271
}

Tests/VergeTests/PropertyWrapperTests.swift

Lines changed: 0 additions & 25 deletions
This file was deleted.

Tests/VergeTests/Retain/PublisherCompletionTests.swift

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,10 @@ final class SubjectCompletionTests: XCTestCase {
1313
var store: DemoStore? = DemoStore()
1414
weak var weakStore: DemoStore? = store
1515

16-
let exp = expectation(description: "completion")
17-
1816
store?.statePublisher()
1917
.sink(
2018
receiveCompletion: { _ in
21-
exp.fulfill()
19+
XCTFail()
2220
},
2321
receiveValue: { _ in }
2422
)
@@ -30,7 +28,6 @@ final class SubjectCompletionTests: XCTestCase {
3028

3129
XCTAssertNil(weakStore)
3230

33-
wait(for: [exp], timeout: 10)
3431
}
3532

3633
func testActivityPublisherCompletion1() {
@@ -40,13 +37,11 @@ final class SubjectCompletionTests: XCTestCase {
4037
var store: DemoStore? = DemoStore()
4138
weak var weakStore: DemoStore? = store
4239

43-
let exp = expectation(description: "completion")
44-
4540
store?
4641
.activityPublisher()
4742
.sink(
4843
receiveCompletion: { _ in
49-
exp.fulfill()
44+
XCTFail()
5045
},
5146
receiveValue: { _ in
5247

@@ -62,7 +57,6 @@ final class SubjectCompletionTests: XCTestCase {
6257
bag.forEach { $0.cancel() }
6358
XCTAssertNil(weakStore)
6459

65-
wait(for: [exp], timeout: 10)
6660
}
6761

6862
func testActivityPublisherCompletion2() {
@@ -103,13 +97,10 @@ final class SubjectCompletionTests: XCTestCase {
10397
var strongRef: Ref? = Ref()
10498
weak var weakRef = strongRef
10599

106-
let e = expectation(description: "completion")
107-
108100
store!.activityPublisher()
109101
.sink(
110102
receiveCompletion: { _ in
111-
print("")
112-
e.fulfill()
103+
XCTFail()
113104
},
114105
receiveValue: { _ in
115106
print("")
@@ -125,7 +116,6 @@ final class SubjectCompletionTests: XCTestCase {
125116

126117
XCTAssertNil(weakRef)
127118

128-
wait(for: [e], timeout: 10)
129119
}
130120

131121
func testDerived_publisher_retains_derived() {
@@ -172,16 +162,13 @@ final class SubjectCompletionTests: XCTestCase {
172162

173163
XCTAssertNil(storeRef.value)
174164

175-
let onComplete = expectation(description: "onComplete")
176-
177165
c = derivedRef.value!
178166
.statePublisher()
179167
.sink(
180168
receiveCompletion: { _ in
181-
onComplete.fulfill()
169+
XCTFail()
182170
},
183171
receiveValue: { _ in
184-
185172
}
186173
)
187174

@@ -193,7 +180,6 @@ final class SubjectCompletionTests: XCTestCase {
193180

194181
XCTAssertNil(storeRef.value)
195182

196-
await fulfillment(of: [onComplete], timeout: 10)
197183
withExtendedLifetime(c, {})
198184
// c?.cancel()
199185

0 commit comments

Comments
 (0)