Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions neo4j/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,6 @@ def driver(cls, uri, *, auth=None, **config):
URI_SCHEME_NEO4J_SELF_SIGNED_CERTIFICATE,
URI_SCHEME_NEO4J_SECURE,
)
from neo4j.conf import (
TRUST_ALL_CERTIFICATES,
TRUST_SYSTEM_CA_SIGNED_CERTIFICATES
)

driver_type, security_type, parsed = parse_neo4j_uri(uri)

Expand Down Expand Up @@ -329,7 +325,7 @@ def open(cls, target, *, auth=None, **config):
from neo4j.io import BoltPool
address = cls.parse_target(target)
pool_config, default_workspace_config = Config.consume_chain(config, PoolConfig, WorkspaceConfig)
pool = BoltPool.open(address, auth=auth, **pool_config)
pool = BoltPool.open(address, auth=auth, pool_config=pool_config, workspace_config=default_workspace_config)
return cls(pool, default_workspace_config)

def __init__(self, pool, default_workspace_config):
Expand Down Expand Up @@ -372,7 +368,7 @@ def open(cls, *targets, auth=None, routing_context=None, **config):
from neo4j.io import Neo4jPool
addresses = cls.parse_targets(*targets)
pool_config, default_workspace_config = Config.consume_chain(config, PoolConfig, WorkspaceConfig)
pool = Neo4jPool.open(*addresses, auth=auth, routing_context=routing_context, **pool_config)
pool = Neo4jPool.open(*addresses, auth=auth, routing_context=routing_context, pool_config=pool_config, workspace_config=default_workspace_config)
return cls(pool, default_workspace_config)

def __init__(self, pool, default_workspace_config):
Expand All @@ -381,7 +377,6 @@ def __init__(self, pool, default_workspace_config):
self._default_workspace_config = default_workspace_config

def session(self, **config):
from neo4j.work.simple import Session
session_config = SessionConfig(self._default_workspace_config, config)
SessionConfig.consume(config) # Consume the config
return Session(self._pool, session_config)
Expand Down
110 changes: 76 additions & 34 deletions neo4j/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,13 @@
SessionExpired,
ReadServiceUnavailable,
WriteServiceUnavailable,
ConfigurationError,
)
from neo4j.routing import RoutingTable
from neo4j.conf import (
PoolConfig,
WorkspaceConfig,
)

# Set up logger
log = getLogger("neo4j")
Expand Down Expand Up @@ -167,32 +172,32 @@ def ping(cls, address, *, timeout=None, **config):
return protocol_version

@classmethod
def open(cls, address, *, auth=None, timeout=None, **config):
def open(cls, address, *, auth=None, timeout=None, **pool_config):
""" Open a new Bolt connection to a given server address.

:param address:
:param auth:
:param timeout:
:param config:
:param timeout: The connection timeout
:param pool_config:
:return:
:raise BoltHandshakeError: raised if the Bolt Protocol can not negotiate a protocol version.
:raise ServiceUnavailable: raised if there was a connection issue.
"""
config = PoolConfig.consume(config)
s, config.protocol_version, handshake, data = connect(
pool_config = PoolConfig.consume(pool_config)
s, pool_config.protocol_version, handshake, data = connect(
address,
timeout=timeout,
custom_resolver=config.resolver,
ssl_context=config.get_ssl_context(),
keep_alive=config.keep_alive,
custom_resolver=pool_config.resolver,
ssl_context=pool_config.get_ssl_context(),
keep_alive=pool_config.keep_alive,
)

if config.protocol_version == (3, 0):
if pool_config.protocol_version == (3, 0):
from neo4j.io._bolt3 import Bolt3
connection = Bolt3(address, s, auth=auth, **config)
elif config.protocol_version == (4, 0):
connection = Bolt3(address, s, pool_config.max_connection_lifetime, auth=auth, user_agent=pool_config.user_agent)
elif pool_config.protocol_version == (4, 0):
from neo4j.io._bolt4x0 import Bolt4x0
connection = Bolt4x0(address, s, auth=auth, **config)
connection = Bolt4x0(address, s, pool_config.max_connection_lifetime, auth=auth, user_agent=pool_config.user_agent)
else:
log.debug("[#%04X] S: <CLOSE>", s.getpeername()[1])
s.shutdown(SHUT_RDWR)
Expand Down Expand Up @@ -327,15 +332,15 @@ def defunct(self):
class IOPool:
""" A collection of connections to one or more server addresses.
"""
from neo4j.conf import PoolConfig, WorkspaceConfig

_default_acquire_timeout = WorkspaceConfig.connection_acquisition_timeout

def __init__(self, opener, pool_config):
def __init__(self, opener, pool_config, workspace_config):
assert callable(opener)
assert isinstance(pool_config, PoolConfig)
assert isinstance(workspace_config, WorkspaceConfig)

self.opener = opener
self.pool_config = pool_config
self.workspace_config = workspace_config
self.connections = {}
self.lock = RLock()
self.cond = Condition(self.lock)
Expand All @@ -355,7 +360,7 @@ def _acquire(self, address, timeout):
"""
t0 = perf_counter()
if timeout is None:
timeout = self._default_acquire_timeout
timeout = self.workspace_config.connection_acquisition_timeout

with self.lock:
try:
Expand Down Expand Up @@ -482,19 +487,18 @@ def close(self):
class BoltPool(IOPool):

@classmethod
def open(cls, address, *, auth=None, **config):
pool_config = PoolConfig.consume(config)
def open(cls, address, *, auth=None, pool_config, workspace_config):

def opener(addr, timeout):
return Bolt.open(addr, auth=auth, timeout=timeout, **pool_config)

pool = cls(opener, pool_config, address)
pool = cls(opener, pool_config, workspace_config, address)
seeds = [pool.acquire() for _ in range(pool_config.init_size)]
pool.release(*seeds)
return pool

def __init__(self, opener, pool_config, address):
super(BoltPool, self).__init__(opener, pool_config)
def __init__(self, opener, pool_config, workspace_config, address):
super(BoltPool, self).__init__(opener, pool_config, workspace_config)
self.address = address

def __repr__(self):
Expand All @@ -509,13 +513,13 @@ class Neo4jPool(IOPool):
"""

@classmethod
def open(cls, *addresses, auth=None, routing_context=None, **config):
pool_config = PoolConfig.consume(config)
def open(cls, *addresses, auth=None, routing_context=None, pool_config=None, workspace_config=None):

def opener(addr, timeout):
return Bolt.open(addr, auth=auth, timeout=timeout, **pool_config)

pool = cls(opener, pool_config, addresses, routing_context)
pool = cls(opener, pool_config, workspace_config, routing_context, addresses)

try:
pool.update_routing_table()
except Exception:
Expand All @@ -524,8 +528,8 @@ def opener(addr, timeout):
else:
return pool

def __init__(self, opener, pool_config, addresses, routing_context):
super(Neo4jPool, self).__init__(opener, pool_config)
def __init__(self, opener, pool_config, workspace_config, routing_context, addresses):
super(Neo4jPool, self).__init__(opener, pool_config, workspace_config)
self.routing_table = RoutingTable(addresses)
self.routing_context = routing_context
self.missing_writer = False
Expand All @@ -539,16 +543,25 @@ def __repr__(self):
def initial_address(self):
return self.routing_table.initial_routers[0]

def fetch_routing_info(self, address, timeout=None):
def fetch_routing_info(self, address, timeout=None, database=None):
""" Fetch raw routing info from a given router address.

:param address: router address
:param timeout: seconds
:param database: the data base name to get routing table for
:return: list of routing records or
None if no connection could be established
:raise ServiceUnavailable: if the server does not support routing or
if routing support is broken
"""
from neo4j.io._bolt3 import Bolt3
from neo4j.io._bolt4x0 import Bolt4x0
from neo4j.api import (
SYSTEM_DATABASE,
DEFAULT_DATABASE,
READ_ACCESS,
)

metadata = {}
records = []

Expand All @@ -562,8 +575,39 @@ def fail(md):
with self._acquire(address, timeout) as cx:
_, _, server_version = (cx.server.agent or "").partition("/")
log.debug("[#%04X] C: <ROUTING> query=%r", cx.local_port, self.routing_context or {})
cx.run("CALL dbms.cluster.routing.getRoutingTable($context)",
{"context": self.routing_context}, on_success=metadata.update, on_failure=fail)

if database is None:
database = self.workspace_config.database

if cx.PROTOCOL_VERSION == Bolt3.PROTOCOL_VERSION:
if database != DEFAULT_DATABASE:
raise ConfigurationError("Database name parameter for selecting database is not supported in Bolt Protocol {!r}. Database name {!r}. Server Agent {!r}.".format(
Bolt3.PROTOCOL_VERSION, database, cx.server.agent))
cx.run(
"CALL dbms.cluster.routing.getRoutingTable($context)", # This is an internal procedure call. Only available if the Neo4j 3.5 is setup with clustering.
{"context": self.routing_context},
on_success=metadata.update,
on_failure=fail,
)
elif cx.PROTOCOL_VERSION == Bolt4x0.PROTOCOL_VERSION:
if database == DEFAULT_DATABASE:
cx.run(
"CALL dbms.routing.getRoutingTable($context)",
{"context": self.routing_context},
mode="r",
db=SYSTEM_DATABASE,
on_success=metadata.update,
on_failure=fail,
)
else:
cx.run(
"CALL dbms.routing.getRoutingTable($context, $database)",
{"context": self.routing_context, "database": database},
mode="r",
db=SYSTEM_DATABASE,
on_success=metadata.update,
on_failure=fail,
)
cx.pull(on_success=metadata.update, on_records=records.extend)
cx.send_all()
cx.fetch_all()
Expand Down Expand Up @@ -623,14 +667,12 @@ def update_routing_table_from(self, *routers):
:return: True if the routing table is successfully updated,
otherwise False
"""
log.debug("Attempting to update routing table from "
"{}".format(", ".join(map(repr, routers))))
log.debug("Attempting to update routing table from {}".format(", ".join(map(repr, routers))))
for router in routers:
new_routing_table = self.fetch_routing_table(router)
if new_routing_table is not None:
self.routing_table.update(new_routing_table)
log.debug("Successfully updated routing table from "
"{!r} ({!r})".format(router, self.routing_table))
log.debug("Successfully updated routing table from {!r} ({!r})".format(router, self.routing_table))
return True
return False

Expand Down
11 changes: 5 additions & 6 deletions neo4j/io/_bolt3.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ class Bolt3(Bolt):
#: The pool of which this connection is a member
pool = None

def __init__(self, unresolved_address, sock, *, auth=None, **config):
self.config = PoolConfig.consume(config)
def __init__(self, unresolved_address, sock, max_connection_lifetime, *, auth=None, user_agent=None):
# self.pool_config = PoolConfig.consume(pool_config)
self.unresolved_address = unresolved_address
self.socket = sock
self.server = ServerInfo(Address(sock.getpeername()), Bolt3.PROTOCOL_VERSION)
Expand All @@ -86,11 +86,11 @@ def __init__(self, unresolved_address, sock, *, auth=None, **config):
self.packer = Packer(self.outbox)
self.unpacker = Unpacker(self.inbox)
self.responses = deque()
self._max_connection_lifetime = self.config.max_connection_lifetime
self._max_connection_lifetime = max_connection_lifetime
self._creation_timestamp = perf_counter()

# Determine the user agent
user_agent = self.config.user_agent
# user_agent = self.pool_config.user_agent
if user_agent:
self.user_agent = user_agent
else:
Expand Down Expand Up @@ -144,8 +144,7 @@ def hello(self):
self.send_all()
self.fetch_all()

def run(self, query, parameters=None, mode=None, bookmarks=None, metadata=None,
timeout=None, db=None, **handlers):
def run(self, query, parameters=None, mode=None, bookmarks=None, metadata=None, timeout=None, db=None, **handlers):
if db is not None:
raise ValueError("Database selection is not supported in Bolt 3")
if not parameters:
Expand Down
11 changes: 5 additions & 6 deletions neo4j/io/_bolt4x0.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ class Bolt4x0(Bolt):
#: The pool of which this connection is a member
pool = None

def __init__(self, unresolved_address, sock, *, auth=None, **config):
self.config = PoolConfig.consume(config)
def __init__(self, unresolved_address, sock, max_connection_lifetime, *, auth=None, user_agent=None):
#self.pool_config = PoolConfig.consume(pool_config)
self.unresolved_address = unresolved_address
self.socket = sock
self.server = ServerInfo(Address(sock.getpeername()), Bolt4x0.PROTOCOL_VERSION)
Expand All @@ -86,11 +86,11 @@ def __init__(self, unresolved_address, sock, *, auth=None, **config):
self.packer = Packer(self.outbox)
self.unpacker = Unpacker(self.inbox)
self.responses = deque()
self._max_connection_lifetime = self.config.max_connection_lifetime
self._max_connection_lifetime = max_connection_lifetime # self.pool_config.max_connection_lifetime
self._creation_timestamp = perf_counter()

# Determine the user agent
user_agent = self.config.user_agent
# user_agent = self.pool_config.user_agent
if user_agent:
self.user_agent = user_agent
else:
Expand Down Expand Up @@ -144,8 +144,7 @@ def hello(self):
self.send_all()
self.fetch_all()

def run(self, query, parameters=None, mode=None, bookmarks=None, metadata=None,
timeout=None, db=None, **handlers):
def run(self, query, parameters=None, mode=None, bookmarks=None, metadata=None, timeout=None, db=None, **handlers):
if not parameters:
parameters = {}
extra = {}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
!: BOLT 3
!: PORT 9001

C: HELLO {"user_agent": "test", "scheme": "basic", "principal": "test", "credentials": "test"}
S: SUCCESS {"server": "Neo4j/3.5.0", "connection_id": "12345678-1234-1234-1234-123456789000"}

C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {}
PULL_ALL
S: SUCCESS {"fields": ["ttl", "servers"]}
RECORD [1234, [{"role":"WRITE", "addresses":["127.0.0.1:9001"]}, {"role":"READ", "addresses":["127.0.0.1:9002", "127.0.0.1:9003"]}, {"role":"ROUTE", "addresses":["127.0.0.1:9001", "127.0.0.1:9002", "127.0.0.1:9003"]}]]
SUCCESS {}

C: GOODBYE
S: <EXIT>
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
!: BOLT 4
!: PORT 9001

C: HELLO {"user_agent": "test", "scheme": "basic", "principal": "test", "credentials": "test"}
S: SUCCESS {"server": "Neo4j/4.0.0", "connection_id": "12345678-1234-1234-1234-123456789000"}

C: RUN "CALL dbms.routing.getRoutingTable($context)" {"context": {}} {"mode": "r", "db": "system"}
PULL {"n": -1}
S: SUCCESS {"fields": ["ttl", "servers"]}
RECORD [1234, [{"role":"WRITE", "addresses":["127.0.0.1:9001"]}, {"role":"READ", "addresses":["127.0.0.1:9002", "127.0.0.1:9003"]}, {"role":"ROUTE", "addresses":["127.0.0.1:9001", "127.0.0.1:9002", "127.0.0.1:9003"]}]]
SUCCESS {}

C: GOODBYE
S: <EXIT>
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
!: BOLT 4
!: PORT 9001

C: HELLO {"user_agent": "test", "scheme": "basic", "principal": "test", "credentials": "test"}
S: SUCCESS {"server": "Neo4j/4.0.0", "connection_id": "12345678-1234-1234-1234-123456789000"}

C: RUN "CALL dbms.routing.getRoutingTable($context, $database)" {"context": {}, "database": "neo4j"} {"mode": "r", "db": "system"}
PULL {"n": -1}
S: SUCCESS {"fields": ["ttl", "servers"]}
RECORD [1234, [{"role":"WRITE", "addresses":["127.0.0.1:9001"]}, {"role":"READ", "addresses":["127.0.0.1:9002", "127.0.0.1:9003"]}, {"role":"ROUTE", "addresses":["127.0.0.1:9001", "127.0.0.1:9002", "127.0.0.1:9003"]}]]
SUCCESS {}

C: GOODBYE
S: <EXIT>
2 changes: 1 addition & 1 deletion tests/stub/scripts/v4x0/router.script
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
!: AUTO RESET
!: PORT 9001

C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {}
C: RUN "CALL dbms.routing.getRoutingTable($context)" {"context": {}} {"mode": "r", "db": "system"}
PULL {"n": -1}
S: SUCCESS {"fields": ["ttl", "servers"]}
RECORD [300, [{"role":"ROUTE","addresses":["127.0.0.1:9001","127.0.0.1:9002","127.0.0.1:9003"]},{"role":"READ","addresses":["127.0.0.1:9004","127.0.0.1:9005"]},{"role":"WRITE","addresses":["127.0.0.1:9006"]}]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
!: AUTO RESET
!: PORT 9001

C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {"name": "molly", "age": "1"}} {}
C: RUN "CALL dbms.routing.getRoutingTable($context)" {"context": {"name": "molly", "age": "1"}} {"mode": "r", "db": "system"}
PULL {"n": -1}
S: SUCCESS {"fields": ["ttl", "servers"]}
RECORD [302, [{"role":"ROUTE", "addresses":["127.0.0.1:9001", "127.0.0.1:9002"]}, {"role":"READ", "addresses":["127.0.0.1:9002"]}, {"role":"WRITE", "addresses":["127.0.0.1:9001"]}]]
Expand Down
Loading