Skip to content
This repository was archived by the owner on Aug 23, 2019. It is now read-only.

Commit 7dcabdd

Browse files
authored
feat: make switch a state machine (#278)
* feat: add basic state machine functionality to switch * feat: make connections state machines * refactor: clean up logs * feat: add dialFSM to the switch * feat: add better support for closing connections * test: add tests for some uncovered lines * feat: add warning emitter for muxer upgrade failed * docs: update readme
1 parent e3b4b42 commit 7dcabdd

21 files changed

+1714
-668
lines changed

README.md

Lines changed: 87 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,17 @@ libp2p-switch is used by [libp2p](https://github.com/libp2p/js-libp2p) but it ca
2626
- [Usage](#usage)
2727
- [Create a libp2p switch](#create-a-libp2p-switch)
2828
- [API](#api)
29-
- [`switch.dial(peer, protocol, callback)`](#swarmdialpi-protocol-callback)
30-
- [`switch.hangUp(peer, callback)`](#swarmhanguppi-callback)
31-
- [`switch.handle(protocol, handler)`](#swarmhandleprotocol-handler)
32-
- [`switch.unhandle(protocol)`](#swarmunhandleprotocol)
33-
- [`switch.start(callback)`](#swarmlistencallback)
34-
- [`switch.stop(callback)`](#swarmclosecallback)
35-
- [`switch.connection`](#connection)
29+
- [`switch.connection`](#switchconnection)
30+
- [`switch.dial(peer, protocol, callback)`](#switchdialpeer-protocol-callback)
31+
- [`switch.dialFSM(peer, protocol, callback)`](#switchdialfsmpeer-protocol-callback)
32+
- [`switch.handle(protocol, handlerFunc, matchFunc)`](#switchhandleprotocol-handlerfunc-matchfunc)
33+
- [`switch.hangUp(peer, callback)`](#switchhanguppeer-callback)
34+
- [`switch.start(callback)`](#switchstartcallback)
35+
- [`switch.stop(callback)`](#switchstopcallback)
3636
- [`switch.stats`](#stats-api)
37-
- [Internal Transports API](#transports)
38-
- [Design Notes](#designnotes)
37+
- [`switch.unhandle(protocol)`](#switchunhandleprotocol)
38+
- [Internal Transports API](#internal-transports-api)
39+
- [Design Notes](#design-notes)
3940
- [Multitransport](#multitransport)
4041
- [Connection upgrades](#connection-upgrades)
4142
- [Identify](#identify)
@@ -86,6 +87,46 @@ tests]([./test/pnet.node.js]).
8687
- peerInfo is a [PeerInfo](https://github.com/libp2p/js-peer-info) object that has the peer information.
8788
- peerBook is a [PeerBook](https://github.com/libp2p/js-peer-book) object that stores all the known peers.
8889

90+
### `switch.connection`
91+
92+
##### `switch.connection.addUpgrade()`
93+
94+
A connection upgrade must be able to receive and return something that implements the [interface-connection](https://github.com/libp2p/interface-connection) specification.
95+
96+
> **WIP**
97+
98+
##### `switch.connection.addStreamMuxer(muxer)`
99+
100+
Upgrading a connection to use a stream muxer is still considered an upgrade, but a special case since once this connection is applied, the returned obj will implement the [interface-stream-muxer](https://github.com/libp2p/interface-stream-muxer) spec.
101+
102+
- `muxer`
103+
104+
##### `switch.connection.reuse()`
105+
106+
Enable the identify protocol.
107+
108+
##### `switch.connection.crypto([tag, encrypt])`
109+
110+
Enable a specified crypto protocol. By default no encryption is used, aka `plaintext`. If called with no arguments it resets to use `plaintext`.
111+
112+
You can use for example [libp2p-secio](https://github.com/libp2p/js-libp2p-secio) like this
113+
114+
```js
115+
const secio = require('libp2p-secio')
116+
switch.connection.crypto(secio.tag, secio.encrypt)
117+
```
118+
119+
##### `switch.connection.enableCircuitRelay(options, callback)`
120+
121+
Enable circuit relaying.
122+
123+
- `options`
124+
- enabled - activates relay dialing and listening functionality
125+
- hop - an object with two properties
126+
- enabled - enables circuit relaying
127+
- active - is it an active or passive relay (default false)
128+
- `callback`
129+
89130
### `switch.dial(peer, protocol, callback)`
90131

91132
dial uses the best transport (whatever works first, in the future we can have some criteria), and jump starts the connection until the point where we have to negotiate the protocol. If a muxer is available, then drop the muxer onto that connection. Good to warm up connections or to check for connectivity. If we have already a muxer for that peerInfo, then do nothing.
@@ -94,13 +135,24 @@ dial uses the best transport (whatever works first, in the future we can have so
94135
- `protocol`
95136
- `callback`
96137

97-
### `switch.hangUp(peer, callback)`
138+
### `switch.dialFSM(peer, protocol, callback)`
98139

99-
Hang up the muxed connection we have with the peer.
140+
works like dial, but calls back with a [Connection State Machine](#connection-state-machine)
100141

101142
- `peer`: can be an instance of [PeerInfo][], [PeerId][] or [multiaddr][]
102-
- `callback`
143+
- `protocol`: String that defines the protocol (e.g '/ipfs/bitswap/1.1.0') to be used
144+
- `callback`: Function with signature `function (err, connFSM) {}` where `connFSM` is a [Connection State Machine](#connection-state-machine)
103145
146+
#### Connection State Machine
147+
Connection state machines emit a number of events that can be used to determine the current state of the connection
148+
and to received the underlying connection that can be used to transfer data.
149+
150+
##### Events
151+
- `error`: emitted whenever a fatal error occurs with the connection; the error will be emitted.
152+
- `error:upgrade_failed`: emitted whenever the connection fails to upgrade with a muxer, this is not fatal.
153+
- `error:connection_attempt_failed`: emitted whenever a dial attempt fails for a given transport. An array of errors is emitted.
154+
- `connection`: emitted whenever a useable connection has been established; the underlying [Connection](https://github.com/libp2p/interface-connection) will be emitted.
155+
- `close`: emitted when the connection has closed.
104156
105157
### `switch.handle(protocol, handlerFunc, matchFunc)`
106158

@@ -110,68 +162,43 @@ Handle a new protocol.
110162
- `handlerFunc` - function called when we receive a dial on `protocol. Signature must be `function (protocol, conn) {}`
111163
- `matchFunc` - matchFunc for multistream-select
112164
113-
### `switch.unhandle(protocol)`
114-
115-
Unhandle a protocol.
116-
117-
- `protocol`
118-
119-
### `switch.on('peer-mux-established', (peer) => {})`
120-
121-
- `peer`: is instance of [PeerInfo][] that has info of the peer we have just established a muxed connection with.
122-
123-
### `switch.on('peer-mux-closed', (peer) => {})`
124-
125-
- `peer`: is instance of [PeerInfo][] that has info of the peer we have just closed a muxed connection.
126-
127-
### `switch.start(callback)`
128-
129-
Start listening on all added transports that are available on the current `peerInfo`.
130-
131-
### `switch.stop(callback)`
165+
### `switch.hangUp(peer, callback)`
132166

133-
Close all the listeners and muxers.
167+
Hang up the muxed connection we have with the peer.
134168

169+
- `peer`: can be an instance of [PeerInfo][], [PeerId][] or [multiaddr][]
135170
- `callback`
136171

137-
### `switch.connection`
172+
### `switch.on('error', (err) => {})`
138173

139-
##### `switch.connection.addUpgrade()`
174+
Emitted when the switch encounters an error.
140175

141-
A connection upgrade must be able to receive and return something that implements the [interface-connection](https://github.com/libp2p/interface-connection) specification.
176+
- `err`: instance of [Error][]
142177

143-
> **WIP**
178+
### `switch.on('peer-mux-established', (peer) => {})`
144179

145-
##### `switch.connection.addStreamMuxer(muxer)`
180+
- `peer`: is instance of [PeerInfo][] that has info of the peer we have just established a muxed connection with.
146181

147-
Upgrading a connection to use a stream muxer is still considered an upgrade, but a special case since once this connection is applied, the returned obj will implement the [interface-stream-muxer](https://github.com/libp2p/interface-stream-muxer) spec.
182+
### `switch.on('peer-mux-closed', (peer) => {})`
148183

149-
- `muxer`
184+
- `peer`: is instance of [PeerInfo][] that has info of the peer we have just closed a muxed connection.
150185

151-
##### `switch.connection.reuse()`
186+
### `switch.on('start', () => {})`
152187

153-
Enable the identify protocol.
188+
Emitted when the switch has successfully started.
154189

155-
##### `switch.connection.crypto([tag, encrypt])`
190+
### `switch.on('stop', () => {})`
156191

157-
Enable a specified crypto protocol. By default no encryption is used, aka `plaintext`. If called with no arguments it resets to use `plaintext`.
192+
Emitted when the switch has successfully stopped.
158193

159-
You can use for example [libp2p-secio](https://github.com/libp2p/js-libp2p-secio) like this
194+
### `switch.start(callback)`
160195

161-
```js
162-
const secio = require('libp2p-secio')
163-
switch.connection.crypto(secio.tag, secio.encrypt)
164-
```
196+
Start listening on all added transports that are available on the current `peerInfo`.
165197

166-
##### `switch.connection.enableCircuitRelay(options, callback)`
198+
### `switch.stop(callback)`
167199

168-
Enable circuit relaying.
200+
Close all the listeners and muxers.
169201

170-
- `options`
171-
- enabled - activates relay dialing and listening functionality
172-
- hop - an object with two properties
173-
- enabled - enables circuit relaying
174-
- active - is it an active or passive relay (default false)
175202
- `callback`
176203

177204
### Stats API
@@ -278,6 +305,11 @@ Each one of these values is [an exponential moving-average instance](https://git
278305

279306
Stats are not updated in real-time. Instead, measurements are buffered and stats are updated at an interval. The maximum interval can be defined through the `Switch` constructor option `stats.computeThrottleTimeout`, defined in miliseconds.
280307

308+
### `switch.unhandle(protocol)`
309+
310+
Unhandle a protocol.
311+
312+
- `protocol`
281313

282314
### Internal Transports API
283315

package.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
"devDependencies": {
3939
"aegir": "^15.1.0",
4040
"chai": "^4.1.2",
41+
"chai-checkmark": "^1.0.1",
4142
"dirty-chai": "^2.0.1",
4243
"libp2p-mplex": "~0.8.2",
4344
"libp2p-pnet": "~0.1.0",
@@ -54,7 +55,10 @@
5455
"dependencies": {
5556
"async": "^2.6.1",
5657
"big.js": "^5.1.2",
58+
"class-is": "^1.1.0",
5759
"debug": "^3.1.0",
60+
"err-code": "^1.1.2",
61+
"fsm-event": "^2.1.0",
5862
"hashlru": "^2.2.1",
5963
"interface-connection": "~0.3.2",
6064
"ip-address": "^5.8.9",

src/connection/base.js

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
'use strict'
2+
3+
const EventEmitter = require('events').EventEmitter
4+
const debug = require('debug')
5+
const withIs = require('class-is')
6+
7+
class BaseConnection extends EventEmitter {
8+
constructor ({ _switch, name }) {
9+
super()
10+
11+
this.switch = _switch
12+
this.ourPeerInfo = this.switch._peerInfo
13+
this.log = debug(`libp2p:conn:${name}`)
14+
}
15+
16+
/**
17+
* Gets the current state of the connection
18+
*
19+
* @returns {string} The current state of the connection
20+
*/
21+
getState () {
22+
return this._state._state
23+
}
24+
25+
/**
26+
* Puts the state into encrypting mode
27+
*
28+
* @returns {void}
29+
*/
30+
encrypt () {
31+
this._state('encrypt')
32+
}
33+
34+
/**
35+
* Puts the state into privatizing mode
36+
*
37+
* @returns {void}
38+
*/
39+
protect () {
40+
this._state('privatize')
41+
}
42+
43+
/**
44+
* Puts the state into muxing mode
45+
*
46+
* @returns {void}
47+
*/
48+
upgrade () {
49+
this._state('upgrade')
50+
}
51+
52+
/**
53+
* Event handler for disconnected.
54+
*
55+
* @fires BaseConnection#close
56+
* @returns {void}
57+
*/
58+
_onDisconnected () {
59+
this.log(`disconnected from ${this.theirB58Id}`)
60+
this.emit('close')
61+
this.removeAllListeners()
62+
}
63+
64+
/**
65+
* Event handler for privatized
66+
*
67+
* @fires BaseConnection#private
68+
* @returns {void}
69+
*/
70+
_onPrivatized () {
71+
this.log(`successfully privatized incoming connection`)
72+
this.emit('private', this.conn)
73+
}
74+
75+
/**
76+
* Wraps this.conn with the Switch.protector for private connections
77+
*
78+
* @private
79+
* @fires ConnectionFSM#error
80+
* @returns {void}
81+
*/
82+
_onPrivatizing () {
83+
if (!this.switch.protector) {
84+
return this._state('done')
85+
}
86+
87+
this.conn = this.switch.protector.protect(this.conn, (err) => {
88+
if (err) {
89+
this.emit('error', err)
90+
return this._state('disconnect')
91+
}
92+
93+
this.log(`successfully privatized conn to ${this.theirB58Id}`)
94+
this.conn.setPeerInfo(this.theirPeerInfo)
95+
this._state('done')
96+
})
97+
}
98+
}
99+
100+
module.exports = withIs(BaseConnection, {
101+
className: 'BaseConnection',
102+
symbolName: 'libp2p-switch/BaseConnection'
103+
})

src/connection/handler.js

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
'use strict'
2+
3+
const debug = require('debug')
4+
const IncomingConnection = require('./incoming')
5+
const observeConn = require('../observe-connection')
6+
7+
function listener (_switch) {
8+
const log = debug(`libp2p:switch:listener`)
9+
10+
/**
11+
* Takes a transport key and returns a connection handler function
12+
*
13+
* @param {string} transportKey The key of the transport to handle connections for
14+
* @param {function} handler A custom handler to use
15+
* @returns {function(Connection)} A connection handler function
16+
*/
17+
return (transportKey, handler) => {
18+
/**
19+
* Takes a base connection and manages listening behavior
20+
*
21+
* @param {Connection} conn The connection to manage
22+
* @returns {void}
23+
*/
24+
return (conn) => {
25+
// Add a transport level observer, if needed
26+
const connection = transportKey ? observeConn(transportKey, null, conn, _switch.observer) : conn
27+
28+
log('received incoming connection')
29+
const connFSM = new IncomingConnection({ connection, _switch, transportKey })
30+
31+
connFSM.once('error', (err) => log(err))
32+
connFSM.once('private', (_conn) => {
33+
// Use the custom handler, if it was provided
34+
if (handler) {
35+
return handler(_conn)
36+
}
37+
connFSM.encrypt()
38+
})
39+
connFSM.once('encrypted', () => connFSM.upgrade())
40+
41+
connFSM.protect()
42+
}
43+
}
44+
}
45+
46+
module.exports = listener

0 commit comments

Comments
 (0)