Skip to content
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
408aa83
Update tests for 5.0 handshake
fbiville Jan 27, 2022
62400fd
Remove Bolt 4.0 feature flag, bump some 4.x to 4.4
fbiville Jan 27, 2022
a9efd7a
Add 5.0 variant to parameterized tests
fbiville Jan 28, 2022
13ae849
Fix noqa comment
fbiville Feb 24, 2022
0a67cac
Make Routing_v4x4 delegate to Routing_v5x0
fbiville Feb 24, 2022
635478f
Fix formatting
fbiville Feb 28, 2022
9af08eb
Remove duplicated scripts
fbiville Feb 28, 2022
f764676
Reinstate blank lines for classes and scripts
fbiville Feb 28, 2022
8208881
add changes for element id in protocol
thelonelyvulpes Feb 28, 2022
06cd316
Reinstate in 4x1 wrongly deleted 4x0 script
fbiville Mar 1, 2022
1cc684c
Merge branch '5.0_handshake' into feature/element-id-changes
thelonelyvulpes Mar 1, 2022
2ca79e8
Reinstate in 4x4 wrongly deleted 4x0 script
fbiville Mar 1, 2022
83d26ac
basic node tests in freki
thelonelyvulpes Mar 1, 2022
54f860f
rename tests
thelonelyvulpes Mar 1, 2022
715e857
relationships and remove duplicate scripts
thelonelyvulpes Mar 1, 2022
49fa18f
Merge branch '5.0_handshake' into feature/element-id-changes
thelonelyvulpes Mar 1, 2022
2987884
Delete reader_return_1_failure_return_2_3_4_and_5_succeed.script
thelonelyvulpes Mar 1, 2022
0185e17
undo change
thelonelyvulpes Mar 1, 2022
b14d00f
dedupe further
thelonelyvulpes Mar 1, 2022
45c14b9
get path deserializing
thelonelyvulpes Mar 2, 2022
e9e5f1d
get path deserializing
thelonelyvulpes Mar 3, 2022
70976ac
fix path serialization in 4.4
thelonelyvulpes Mar 3, 2022
b2795d0
add missing assertions on path
thelonelyvulpes Mar 3, 2022
794713e
add missing jolt simple tests for brevity.
thelonelyvulpes Mar 3, 2022
f9b0c5b
try and improve readability of logic in transformers
thelonelyvulpes Mar 3, 2022
2facb54
undo a change
thelonelyvulpes Mar 3, 2022
7d08a5d
tidying node/rel decoding
thelonelyvulpes Mar 3, 2022
c1e4f42
Update tests for 5.0 handshake
fbiville Jan 27, 2022
b81a69f
Remove Bolt 4.0 feature flag, bump some 4.x to 4.4
fbiville Jan 27, 2022
85e5b76
Add 5.0 variant to parameterized tests
fbiville Jan 28, 2022
1553d41
Fix noqa comment
fbiville Feb 24, 2022
7a35b3d
Make Routing_v4x4 delegate to Routing_v5x0
fbiville Feb 24, 2022
7a0c7f8
Fix formatting
fbiville Feb 28, 2022
064545b
Remove duplicated scripts
fbiville Feb 28, 2022
9f81ff3
Reinstate blank lines for classes and scripts
fbiville Feb 28, 2022
14b1e75
Reinstate in 4x4 wrongly deleted 4x0 script
fbiville Mar 1, 2022
437ad96
Refactor changes to enable future packstream/jolt versions
robsdedude Mar 4, 2022
9a52523
Merge branch '5.0_handshake' into feature/element-id-changes
robsdedude Mar 4, 2022
c59877a
Merge branch '5.0' into feature/element-id-changes
robsdedude Mar 8, 2022
08bcd9b
Fix code style
robsdedude Mar 8, 2022
c4ca562
add element id feature flag
thelonelyvulpes Mar 8, 2022
79b6d3d
update the name for element id feature toggle
thelonelyvulpes Mar 8, 2022
72062c1
reorder import
thelonelyvulpes Mar 8, 2022
dcc9cad
fix test
thelonelyvulpes Mar 8, 2022
79e19cb
Remove API_GRAPH_TYPES_ELEMENT_ID feature flag
robsdedude Mar 8, 2022
cee03c9
boltstub: fix treating bolt messages as packstream structs
robsdedude Mar 8, 2022
da801df
Add implementation detail flag `DETAIL_NULL_ON_MISSING_ID`
robsdedude Mar 8, 2022
7c733ba
use bolt_5.0 feature check for all as 4.4 doesn't work
thelonelyvulpes Mar 8, 2022
6b06fe3
Merge remote-tracking branch 'origin/feature/element-id-changes' into…
thelonelyvulpes Mar 8, 2022
a8bd9ac
Revert "use bolt_5.0 feature check for all as 4.4 doesn't work"
robsdedude Mar 8, 2022
5197d4e
Make protocol lenient to old style cypher types.
robsdedude Mar 8, 2022
de28cd9
Merge branch '5.0' into feature/element-id-changes
thelonelyvulpes Mar 11, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions boltstub/bolt_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ def get_bolt_protocol(version):
if version is None:
raise BoltMissingVersionError()
for sub in recursive_subclasses(BoltProtocol):
if (version == sub.protocol_version
or version in sub.version_aliases):
if version == sub.protocol_version or version in sub.version_aliases:
return sub
raise BoltUnknownVersionError(
"unsupported bolt version {}".format(version)
Expand Down Expand Up @@ -371,3 +370,13 @@ class Bolt4x4Protocol(Bolt4x3Protocol):
equivalent_versions = set()

server_agent = "Neo4j/4.4.0"


class Bolt5x0Protocol(Bolt4x4Protocol):

protocol_version = (5, 0)
version_aliases = set()
# allow the server to negotiate other bolt versions
equivalent_versions = set()

server_agent = "Neo4j/5.0.0"
3 changes: 2 additions & 1 deletion boltstub/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
class Channel:
def __init__(self, wire, bolt_version, log_cb=None, handshake_data=None):
self.wire = wire
self.stream = PackStream(wire)
self.bolt_protocol = get_bolt_protocol(bolt_version)
self.stream = PackStream(wire,
self.bolt_protocol.protocol_version[0] < 5)
self.log = log_cb
self.handshake_data = handshake_data
self._buffered_msg = None
Expand Down
274 changes: 202 additions & 72 deletions boltstub/packstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.


import inspect
from codecs import decode
from io import BytesIO
from struct import pack as struct_pack
Expand Down Expand Up @@ -62,10 +62,15 @@ class StructTag:

class Structure:

def __init__(self, tag, *fields, verified=True):
def __init__(self, tag, *fields, verified=True,
validate_old_id=None):
self.tag = tag
self.fields = list(fields)
self._verified = verified
if validate_old_id is None:
self.validate_either = True
self.validate_v3 = validate_old_id

if verified:
self._verify_fields()

Expand All @@ -76,58 +81,124 @@ def verified(self):
def _verify_fields(self):
tag, fields = self.tag, self.fields

def verify_node():
if (len(fields) != 3
or not isinstance(fields[0], int)
or not isinstance(fields[1], list)
or not all(isinstance(label, str) for label in fields[1])
or not isinstance(fields[2], dict)
or not all(isinstance(k, str) for k in fields[2].keys())):
raise ValueError("Invalid Node struct received %r" % self)

def verify_relationship():
if (len(fields) != 5
or not isinstance(fields[0], int)
or not isinstance(fields[1], int)
or not isinstance(fields[2], int)
or not isinstance(fields[3], str)
or not isinstance(fields[4], dict)
or not all(isinstance(k, str) for k in fields[4].keys())):
raise ValueError(
"Invalid Relationship struct received %r" % self
)

def verify_unbound_relationship():
if (len(fields) != 3
or not isinstance(fields[0], int)
or not isinstance(fields[1], str)
or not isinstance(fields[2], dict)
or not all(isinstance(k, str) for k in fields[2].keys())):
raise ValueError(
"Invalid UnboundRelationship struct received %r" % self
)
def validate_validations(validations, type_name):
for validation in validations:
if not validation(fields):
raise ValueError(
f"Invalid {type_name} struct received.\n"
f"validation failed: {inspect.getsource(validation)}"
f" {self}")

def verify_node_3_up():
validations = [
lambda f: len(f) == 3,
lambda f: isinstance(f[0], int),
lambda f: isinstance(f[1], list),
lambda f: all(isinstance(label, str) for label in f[1]),
lambda f: isinstance(f[2], dict),
lambda f: all(isinstance(k, str) for k in f[2].keys())
]
validate_validations(validations, "V3_node")

def verify_node_5_up():
if self.validate_either:
try:
verify_node_3_up()
return
except ValueError as _:
pass

validations = [
lambda f: len(f) == 4,
lambda f: isinstance(f[0], int) or f[0] is None,
lambda f: isinstance(f[1], list),
lambda f: all(isinstance(label, str) for label in f[1]),
lambda f: isinstance(f[2], dict),
lambda f: all(isinstance(k, str) for k in f[2].keys()),
lambda f: isinstance(f[3], str)
]
validate_validations(validations, "V5_node")

def verify_unbound_relationship_3_up():
validations = [
lambda f: len(f) == 3,
lambda f: isinstance(f[0], int),
lambda f: isinstance(f[1], str),
lambda f: isinstance(f[2], dict),
lambda f: all(isinstance(k, str) for k in f[2].keys())
]
validate_validations(validations, "V3_UnboundRelationship")

def verify_unbound_relationship_5_up():
if self.validate_either:
try:
verify_unbound_relationship_3_up()
return
except ValueError as _:
pass
validations = [
lambda f: len(f) == 4,
lambda f: isinstance(f[0], int) or f[0] is None,
lambda f: isinstance(f[1], str),
lambda f: isinstance(f[2], dict),
lambda f: all(isinstance(k, str) for k in f[2].keys()),
lambda f: isinstance(f[3], str)
]
validate_validations(validations, "V5_UnboundRelationship")

def verify_relationship_3_up():
validations = [
lambda f: len(f) == 5,
lambda f: isinstance(f[0], int),
lambda f: isinstance(f[1], int),
lambda f: isinstance(f[2], int),
lambda f: isinstance(f[3], str),
lambda f: isinstance(f[4], dict),
lambda f: all(isinstance(k, str) for k in f[4].keys())
]
validate_validations(validations, "V3_Relationship")

def verify_relationship_5_up():
if self.validate_either:
try:
verify_relationship_3_up()
return
except ValueError as _:
pass

validations = [
lambda f: len(f) == 8,
lambda f: (isinstance(f[0], int) or f[0] is None),
lambda f: (isinstance(f[1], int) or f[1] is None),
lambda f: (isinstance(f[2], int) or f[2] is None),
lambda f: isinstance(f[3], str),
lambda f: isinstance(f[4], dict),
lambda f: all(isinstance(k, str) for k in f[4].keys()),
lambda f: isinstance(f[5], str),
lambda f: isinstance(f[6], str),
lambda f: isinstance(f[7], str),
]
validate_validations(validations, "V5_Relationship")

def verify_path():
if (len(fields) != 3
or not isinstance(fields[0], list)
or not all(isinstance(n, Structure)
and n.tag == StructTag.node
and n.fields[0] in fields[2] # id is used
for n in fields[0])
or not isinstance(fields[1], list)
or not all(isinstance(rel, Structure)
and rel.tag == StructTag.unbound_relationship
and rel.fields[0] in fields[2] # id is used
for rel in fields[1])
or not isinstance(fields[2], list)
or not all(isinstance(id_, int)
# id exists in nodes or relationships
and id_ in {s.fields[0]
for s in fields[0] + fields[1]}
for id_ in fields[2])):
raise ValueError(
"Invalid Path struct received %r" % self
)
validations = [
lambda f: len(f) == 3,
lambda f: all(isinstance(n, Structure)
and n.tag == StructTag.node
for n in f[0]),
lambda f: isinstance(f[1], list),
lambda f: all(isinstance(rel, Structure)
and rel.tag == StructTag.unbound_relationship
for rel in f[1]),
lambda f: isinstance(f[2], list),
# index is less than respective array length
# rels indexed from 1, nodes 0
lambda f: all(isinstance(id_, int)
and abs(id_) <= len(f[1])
if i % 2 == 0 else abs(id_) < len(f[0])
for i, id_ in enumerate(f[2]))
]
validate_validations(validations, "Path")

def build_generic_verifier(types, name):
def verify():
Expand All @@ -142,11 +213,14 @@ def verify():

field_validator = {
StructTag.node:
verify_node,
verify_node_3_up if self.validate_v3
else verify_node_5_up,
StructTag.relationship:
verify_relationship,
verify_relationship_3_up if self.validate_v3
else verify_relationship_5_up,
StructTag.unbound_relationship:
verify_unbound_relationship,
verify_unbound_relationship_3_up if self.validate_v3
else verify_unbound_relationship_5_up,
StructTag.path:
verify_path,
StructTag.date:
Expand Down Expand Up @@ -264,27 +338,80 @@ def from_jolt_type(cls, jolt: jolt_types.JoltType):
return cls(StructTag.point_3d, jolt.srid, jolt.x, jolt.y,
jolt.z)
if isinstance(jolt, jolt_types.JoltNode):
if jolt.element_id is not None:
return cls(StructTag.node, jolt.id, jolt.labels,
jolt.properties, jolt.element_id)
return cls(StructTag.node, jolt.id, jolt.labels, jolt.properties)
if isinstance(jolt, jolt_types.JoltRelationship):
if jolt.element_id is None:
return cls(StructTag.relationship, jolt.id, jolt.start_node_id,
jolt.end_node_id, jolt.rel_type, jolt.properties)
return cls(StructTag.relationship, jolt.id, jolt.start_node_id,
jolt.end_node_id, jolt.rel_type, jolt.properties)
jolt.end_node_id, jolt.rel_type, jolt.properties,
jolt.element_id, jolt.start_node_element_id,
jolt.end_node_element_id)
if isinstance(jolt, jolt_types.JoltPath):
# Node structs
uniq_nodes = []
uniq_rels = []
ids = []
nodes = []
rels = []
# Node structs
node_idxs = {}
node_idx = 0
for node in jolt.path[::2]:
node = cls(StructTag.node, node.id, node.labels,
node.properties)
if node not in nodes:
nodes.append(node)
nodes.append(node)
map_node = Structure.from_jolt_type(node)
if map_node not in uniq_nodes:
node_idxs[str(node.id)
if node.element_id is None
else node.element_id] = node_idx
node_idx = node_idx + 1
# nodesDict[str(node.id)] = node
uniq_nodes.append(map_node)

# UnboundRelationship structs
rels = []

rel_idxs = {}
rel_idx = 1
for rel in jolt.path[1::2]:
rel = cls(StructTag.unbound_relationship, rel.id, rel.rel_type,
rel.properties)
if rel not in rels:
rels.append(rel)
ids = [e.id for e in jolt.path]
return cls(StructTag.path, nodes, rels, ids)
rels.append(rel)

ub_rel = (cls(StructTag.unbound_relationship, rel.id,
rel.rel_type,
rel.properties, rel.element_id)
if rel.element_id is not None else
cls(StructTag.unbound_relationship, rel.id,
rel.rel_type,
rel.properties))
if ub_rel not in uniq_rels:
rel_idxs[str(rel.id)
if rel.element_id is None
else rel.element_id] = rel_idx
rel_idx = rel_idx + 1
# relsDict[rel.id_] = rel
uniq_rels.append(ub_rel)

last_node = nodes[0]
for i in range(1, (len(rels)*2)+1):
if i % 2 == 0:
last_node = nodes[int(i/2)]
index = node_idxs[str(last_node.id)
if last_node.element_id is None
else last_node.element_id]
ids.append(index)
else:
rel = rels[int(i/2)]
index = rel_idxs[str(rel.id)
if rel.element_id is None
else rel.element_id]
# print(f"{rel}")
if last_node.id == rel.start_node_id:
ids.append(index)
else:
ids.append(-index)

return cls(StructTag.path, uniq_nodes, uniq_rels, ids)
raise TypeError("Unsupported jolt type: {}".format(type(jolt)))

def to_jolt_type(self):
Expand Down Expand Up @@ -633,8 +760,9 @@ def pack_end_of_stream(self):

class Unpacker:

def __init__(self, unpackable):
def __init__(self, unpackable, validate_old_id):
self.unpackable = unpackable
self.validate_old_id = validate_old_id

def reset(self):
self.unpackable.reset()
Expand Down Expand Up @@ -731,7 +859,8 @@ def _unpack(self, verify_struct=True):
fields = [None] * size
for i in range(len(fields)):
fields[i] = self._unpack(verify_struct=True)
return Structure(tag, *fields, verified=verify_struct)
return Structure(tag, *fields, verified=verify_struct,
validate_old_id=self.validate_old_id)

elif marker == 0xDF: # END_OF_STREAM:
return EndOfStream
Expand Down Expand Up @@ -888,8 +1017,9 @@ def receive(self, sock, n_bytes):
class PackStream:
"""Chunked message reader/writer for PackStream messaging."""

def __init__(self, wire):
def __init__(self, wire, validate_old_id):
self.wire = wire
self.validate_old_id = validate_old_id
self.data_buffer = []
self.next_chunk_size = None

Expand All @@ -911,7 +1041,7 @@ def read_message(self):
break
buffer = UnpackableBuffer(b"".join(self.data_buffer))
self.data_buffer = []
unpacker = Unpacker(buffer)
unpacker = Unpacker(buffer, self.validate_old_id)
return unpacker.unpack_message()

def write_message(self, message):
Expand Down
Loading