Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 29 additions & 49 deletions src/main/python/community-detector/community_detector.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from collections import defaultdict
from itertools import chain
import logging

Expand All @@ -8,56 +7,41 @@


def build_matrix(id_to_buckets):
"""Builds a CSR matrix from a list of lists of buckets

Same code as in Apollo ConnectedComponentsModel.
https://github.com/src-d/apollo/blob/f51c5a92c24cbedd54b9b30bab02f03e51fd27b3/apollo/graph.py#L28
"""Builds a CSR matrix from a list of [elementid, buckets]

Args:
id_to_buckets: list of lists of buckets. The index is the element id
id_to_buckets: sorted list of [elementid, buckets] by elementid.

Returns:
A scipy.sparse.csr_matrix with the same contents
"""

if len(id_to_buckets) == 0:
if not id_to_buckets:
return csr_matrix((0, 0), dtype=numpy.uint8)

data = numpy.ones(sum(map(len, id_to_buckets)), dtype=numpy.uint8)
max_el_id = 0
data_size = 0
for item in id_to_buckets:
max_el_id = max(max_el_id, item[0])
data_size += len(item[1])

data = numpy.ones(data_size, dtype=numpy.uint8)
indices = numpy.zeros(len(data), dtype=numpy.uint32)
indptr = numpy.zeros(len(id_to_buckets) + 1, dtype=numpy.uint32)
indptr = numpy.zeros(max_el_id + 2, dtype=numpy.uint32)
pos = 0
for i, element in enumerate(id_to_buckets):
indices[pos:(pos + len(element))] = element
pos += len(element)
indptr[i + 1] = pos
return csr_matrix((data, indices, indptr))


def build_id_to_cc(connected_components, length):
"""Builds a ndarray that associates element id to a connected component

Same code as in Apollo ConnectedComponentsModel.
https://github.com/src-d/apollo/blob/f51c5a92c24cbedd54b9b30bab02f03e51fd27b3/apollo/graph.py#L28

Args:
connected_components: list of tuples (connected-component, element ids)
length: number of elements
from_el_id = 0
for el_id, bucket in id_to_buckets:
indices[pos:(pos + len(bucket))] = bucket
# fill gap from previous elem id to current el id with prev pos value
indptr[from_el_id + 1:el_id + 1] = pos
pos += len(bucket)
indptr[el_id + 1] = pos
from_el_id = el_id + 1

Returns:
A 1 dimension ndarray. The index will be the element id, and the
value is the connected component
"""

id_to_cc = numpy.zeros(length, dtype=numpy.uint32)
for cc, ids in connected_components:
for id_ in ids:
id_to_cc[id_] = cc

return id_to_cc
return csr_matrix((data, indices, indptr))


def detect_communities(cc,
def detect_communities(ccs,
buckets_matrix,
edges="linear",
algorithm="walktrap",
Expand All @@ -68,8 +52,8 @@ def detect_communities(cc,
https://github.com/src-d/apollo/blob/6b370b5f34ba9e31cf3310e70a2eff35dd978faa/apollo/graph.py#L191

Args:
cc: list with the connected components. Index is the element id, the
value is the connected component
ccs: dict with the connected components. Index is the connected component, the
value is the list of element ids
buckets_matrix: scipy.sparse.csr_matrix with the buckets. One row for
each element, with a column for each bucket. If the element is in a
bucket, the corresponding row,column (element id, bucket id) is 1,
Expand All @@ -80,7 +64,7 @@ def detect_communities(cc,
- quadratic: slow, but surely fits all the algorithms.
algorithm: The community detection algorithm to apply.
algorithm_params: Parameters for the algorithm (**kwargs, JSON format).

Returns:
A list of communities. Each community is a list of element-ids
"""
Expand All @@ -93,11 +77,6 @@ def detect_communities(cc,
log = logging.getLogger("community-detector")
log.debug("Building the connected components")

ccs = defaultdict(list)

for i, c in enumerate(cc):
ccs[c].append(i)

buckindices = buckets_matrix.indices
buckindptr = buckets_matrix.indptr
total_nvertices = buckets_matrix.shape[0]
Expand All @@ -120,12 +99,12 @@ def detect_communities(cc,
fat_ccs.append(vertices)

log.debug("Building %d graphs", len(fat_ccs))
bucket_weights = buckets_matrix.sum(axis=0)

for vertices in fat_ccs:
if linear:
edges = []
weights = []
bucket_weights = buckets_matrix.sum(axis=0)
buckets = set()
for i in vertices:
for j in range(buckindptr[i], buckindptr[i + 1]):
Expand All @@ -142,8 +121,8 @@ def detect_communities(cc,
for j in range(buckindptr[i], buckindptr[i + 1]):
buckets.add(buckindices[j])
for bucket in buckets:
buckverts = \
buckmat_csc.indices[buckmat_csc.indptr[bucket]:buckmat_csc.indptr[bucket + 1]]
buckverts = buckmat_csc.indices[
buckmat_csc.indptr[bucket]:buckmat_csc.indptr[bucket + 1]]
for i, x in enumerate(buckverts):
for y in buckverts:
if x < y:
Expand All @@ -160,7 +139,8 @@ def detect_communities(cc,
log.debug("Launching the community detection")
detector = CommunityDetector(algorithm=algorithm, config=algorithm_params)

communities.extend(chain.from_iterable((detector(g) for g in graphs)))
for community in chain.from_iterable((detector(g) for g in graphs)):
communities.append([i for i in community if i < total_nvertices])

if len(communities) > 0:
log.debug("Overall communities: %d", len(communities))
Expand Down
24 changes: 6 additions & 18 deletions src/main/python/community-detector/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,34 +8,22 @@


def read_connected_components(filepath):
dict = pq.read_table(filepath).to_pydict()

ccs = dict['cc']
ids = dict['element_ids']
return list(zip(ccs, ids))
d = pq.read_table(filepath).to_pydict()
return dict(zip(d['cc'], d['element_ids']))


def read_buckets_matrix(filepath):
dict = pq.read_table(filepath).to_pydict()

id_to_buckets = dict['buckets']

return community_detector.build_matrix(id_to_buckets)
d = pq.read_table(filepath).to_pydict()
return community_detector.build_matrix(list(zip(d['elId'], d['buckets'])))


def main(dirpath):
connected_components = read_connected_components('%s/cc.parquet' % dirpath)

buckets_matrix = read_buckets_matrix('%s/buckets.parquet' % dirpath)
n_ids = buckets_matrix.shape[0]

# TODO (carlosms): Scala produces a map of cc->element-id,
# the lib requires element-id->cc, but only to convert it
# to cc->element-id. Easy change once everything is working.
id_to_cc = community_detector.build_id_to_cc(connected_components, n_ids)

# The result is a list of communities. Each community is a list of element-ids
coms = community_detector.detect_communities(id_to_cc, buckets_matrix)
coms = community_detector.detect_communities(connected_components,
buckets_matrix)
com_ids = list(range(len(coms)))

data = [pa.array(com_ids), pa.array(coms)]
Expand Down
93 changes: 77 additions & 16 deletions src/main/python/community-detector/test_community_detector.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from collections import defaultdict
import unittest
import os

import numpy
from numpy.testing import assert_array_equal
from scipy.sparse import csr_matrix

from community_detector import detect_communities
from community_detector import detect_communities, build_matrix

dirname = os.path.dirname(__file__)

Expand All @@ -23,28 +24,88 @@ def test_detect_communities(self):
with numpy.load("%s/fixtures/input.npz" % (dirname)) as input_npz:
buckets = build_csr_matrix(input_npz)
cc = input_npz['id_to_cc']
ccs = defaultdict(list)
for i, c in enumerate(cc):
ccs[c].append(i)

# Call community_detector
communities = detect_communities(cc.tolist(), buckets)

# Replaces CommunitiesModel().construct(communities, ccsmodel.id_to_element).save(output)
size = sum(map(len, communities))
data = numpy.zeros(size, dtype=numpy.uint32)
indptr = numpy.zeros(len(communities) + 1, dtype=numpy.int64)
pos = 0
for i, community in enumerate(communities):
data[pos:pos + len(community)] = community
pos += len(community)
indptr[i + 1] = pos
communities = detect_communities(ccs, buckets)

# Read npz output
with numpy.load("%s/fixtures/output.npz" % (dirname)) as output:
fixture_data = output['data']
fixture_indptr = output['indptr']
fixture_data = output['data']
fixture_communities = []
for i in range(len(fixture_indptr) - 1):
ptr_from = fixture_indptr[i]
ptr_to = fixture_indptr[i + 1]
community = fixture_data[ptr_from:ptr_to]
# filter out buckets from fixture (apollo returns them)
fixture_communities.append(
[j for j in community if j < len(cc)])

assert_array_equal(communities, fixture_communities)

def test_with_optimized_input(self):
# scala part would remove elements that appear only in 1 bucket
id_to_buckets = [
[0, [0]],
[1, [0, 2]],
[5, [2]],
[6, [1]],
[7, [1]],
]
ccs = {
0: [1, 5, 0],
1: [6, 7]
}
buckets = build_matrix(id_to_buckets)
communities = detect_communities(ccs, buckets)

self.assertTrue(len(communities) == 3)
assert_array_equal(communities[0], [6, 7])
assert_array_equal(communities[1], [1, 0])
assert_array_equal(communities[2], [5])

# input without skipped ids should produce the same communites
id_to_buckets = [
[0, [0]],
[1, [0, 2]],
[2, [2]],
[3, [1]],
[4, [1]],
]
ccs = {
0: [1, 2, 0],
1: [3, 4]
}
buckets = build_matrix(id_to_buckets)
communities = detect_communities(ccs, buckets)

self.assertTrue(len(communities) == 3)
assert_array_equal(communities[0], [3, 4])
assert_array_equal(communities[1], [1, 0])
assert_array_equal(communities[2], [2])

def test_start_with_not_zero_input(self):
id_to_buckets = [
[2, [0]],
[3, [0, 2]],
[7, [2]],
[8, [1]],
[9, [1]],
]
ccs = {
0: [3, 7, 2],
1: [8, 9]
}
buckets = build_matrix(id_to_buckets)
communities = detect_communities(ccs, buckets)

# Assert equality
assert_array_equal(data, fixture_data)
assert_array_equal(indptr, fixture_indptr)
self.assertTrue(len(communities) == 3)
assert_array_equal(communities[0], [8, 9])
assert_array_equal(communities[1], [3, 2])
assert_array_equal(communities[2], [7])


if __name__ == '__main__':
Expand Down
Loading