Skip to content

Commit 841499a

Browse files
committed
first implementation of arrow table as output of traveltimeMatrix
1 parent 0f140f4 commit 841499a

File tree

4 files changed

+464
-58
lines changed

4 files changed

+464
-58
lines changed
Lines changed: 299 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,299 @@
1+
package org.ipea.r5r.Process;
2+
3+
import com.conveyal.r5.analyst.FreeFormPointSet;
4+
import com.conveyal.r5.analyst.cluster.RegionalTask;
5+
import com.conveyal.r5.analyst.scenario.Scenario;
6+
import com.conveyal.r5.api.util.LegMode;
7+
import com.conveyal.r5.api.util.TransitModes;
8+
import com.conveyal.r5.profile.StreetMode;
9+
import com.conveyal.r5.transit.TransportNetwork;
10+
import org.apache.arrow.memory.BufferAllocator;
11+
import org.apache.arrow.memory.RootAllocator;
12+
import org.apache.arrow.vector.types.pojo.Schema;
13+
import org.ipea.r5r.RoutingProperties;
14+
import org.ipea.r5r.Utils.Utils;
15+
import org.slf4j.Logger;
16+
import org.slf4j.LoggerFactory;
17+
18+
import java.io.*;
19+
import java.nio.channels.WritableByteChannel;
20+
import java.text.ParseException;
21+
import java.time.LocalDate;
22+
import java.util.*;
23+
import java.util.concurrent.*;
24+
import java.util.concurrent.atomic.AtomicInteger;
25+
26+
import static java.lang.Math.max;
27+
import static java.nio.channels.Channels.newChannel;
28+
29+
public abstract class ArrowR5Process {
30+
31+
protected final ForkJoinPool r5rThreadPool;
32+
protected final TransportNetwork transportNetwork;
33+
protected final RoutingProperties routingProperties;
34+
35+
protected String[] fromIds;
36+
protected double[] fromLats;
37+
protected double[] fromLons;
38+
protected int nOrigins;
39+
40+
protected String[] toIds;
41+
protected double[] toLats;
42+
protected double[] toLons;
43+
protected String[] opportunities;
44+
protected int[][] opportunityCounts;
45+
protected int nDestinations;
46+
47+
protected FreeFormPointSet[] destinationPoints;
48+
49+
protected EnumSet<LegMode> directModes;
50+
protected EnumSet<TransitModes> transitModes;
51+
protected EnumSet<LegMode> accessModes;
52+
protected EnumSet<LegMode> egressModes;
53+
54+
protected String departureDate;
55+
protected String departureTime;
56+
protected int secondsFromMidnight;
57+
58+
protected int maxWalkTime;
59+
protected int maxBikeTime;
60+
protected int maxCarTime;
61+
protected int maxTripDuration;
62+
63+
protected Schema schema;
64+
protected BufferAllocator parentAllocator;
65+
66+
protected abstract boolean isOneToOne();
67+
68+
private static final Logger LOG = LoggerFactory.getLogger(ArrowR5Process.class);
69+
70+
public ArrowR5Process(ForkJoinPool threadPool, TransportNetwork transportNetwork, RoutingProperties routingProperties) {
71+
this.r5rThreadPool = threadPool;
72+
this.transportNetwork = transportNetwork;
73+
this.routingProperties = routingProperties;
74+
75+
destinationPoints = null;
76+
}
77+
78+
public byte[] run() throws ExecutionException, InterruptedException {
79+
buildDestinationPointSet();
80+
buildSchemaStructure();
81+
82+
final BlockingQueue<BatchWithSeq> queue = new ArrayBlockingQueue<>(Math.min(nOrigins, 256));
83+
84+
try (BufferAllocator parentAllocator = new RootAllocator()) {
85+
this.parentAllocator = parentAllocator;
86+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
87+
WritableByteChannel channel = newChannel(baos);
88+
89+
Collector collector = new Collector(parentAllocator, schema, queue, channel, nOrigins);
90+
Thread collectorThread = new Thread(collector, "arrow-collector");
91+
collectorThread.start();
92+
93+
AtomicInteger totalProcessed = new AtomicInteger(1);
94+
final List<ForkJoinTask<?>> tasks = new ArrayList<>(nOrigins);
95+
96+
for (int index = 0; index < nOrigins; index++) {
97+
final int originIndex = index;
98+
tasks.add(r5rThreadPool.submit(() -> {
99+
BatchWithSeq b = tryRunProcess(totalProcessed, originIndex);
100+
try {
101+
queue.put(b);
102+
} catch (InterruptedException ie) {
103+
Thread.currentThread().interrupt();
104+
LOG.error("Interrupted while enqueuing batch for origin {}", originIndex, ie);
105+
throw new RuntimeException("Interrupted while enqueuing batch for origin " + originIndex, ie);
106+
}
107+
}));
108+
}
109+
110+
for (ForkJoinTask<?> t : tasks) {
111+
t.get();
112+
}
113+
114+
collectorThread.join();
115+
LOG.info(".. DONE! All batches written: {} origins.", nOrigins);
116+
return baos.toByteArray();
117+
}
118+
}
119+
120+
protected abstract void buildSchemaStructure();
121+
122+
protected void buildDestinationPointSet() {
123+
destinationPoints = new FreeFormPointSet[this.opportunities.length];
124+
125+
for (int i = 0; i < this.opportunities.length; i++) {
126+
ByteArrayOutputStream dataStream = new ByteArrayOutputStream();
127+
DataOutputStream pointStream = new DataOutputStream(dataStream);
128+
129+
try {
130+
pointStream.writeInt(toIds.length);
131+
for (String toId : toIds) {
132+
pointStream.writeUTF(toId);
133+
}
134+
for (double toLat : toLats) {
135+
pointStream.writeDouble(toLat);
136+
}
137+
for (double toLon : toLons) {
138+
pointStream.writeDouble(toLon);
139+
}
140+
for (int opportunity : opportunityCounts[i]) {
141+
pointStream.writeDouble(opportunity);
142+
}
143+
} catch (IOException e) {
144+
e.printStackTrace();
145+
}
146+
147+
ByteArrayInputStream pointsInput = new ByteArrayInputStream(dataStream.toByteArray());
148+
149+
try {
150+
destinationPoints[i] = new FreeFormPointSet(pointsInput);
151+
} catch (IOException e) {
152+
e.printStackTrace();
153+
}
154+
155+
if (!this.directModes.isEmpty()) {
156+
for (LegMode mode : this.directModes) {
157+
transportNetwork.linkageCache.getLinkage(destinationPoints[i], transportNetwork.streetLayer, StreetMode.valueOf(mode.toString()));
158+
}
159+
}
160+
}
161+
}
162+
163+
public void setDestinations(String[] toIds, double[] toLats, double[] toLons) {
164+
int[][] opportunityCounts = new int[1][toIds.length];
165+
for (int i = 0; i < toIds.length; i++) opportunityCounts[0][i] = 0;
166+
167+
String[] opportunities = new String[]{"all"};
168+
169+
setDestinations(toIds, toLats, toLons, opportunities, opportunityCounts);
170+
}
171+
172+
public void setDestinations(String[] toIds, double[] toLats, double[] toLons, String[] opportunities, int[][] opportunityCounts) {
173+
this.toIds = toIds;
174+
this.toLats = toLats;
175+
this.toLons = toLons;
176+
this.opportunities = opportunities;
177+
this.opportunityCounts = opportunityCounts;
178+
179+
this.nDestinations = toIds.length;
180+
181+
// set maxDestinations in R5 for detailed path information retrieval
182+
// PathResult.maxDestinations does not exist in R5 anymore
183+
//PathResult.maxDestinations = this.nDestinations;
184+
}
185+
186+
public void setOrigins(String[] fromIds, double[] fromLats, double[] fromLons) {
187+
this.fromIds = fromIds;
188+
this.fromLats = fromLats;
189+
this.fromLons = fromLons;
190+
191+
this.nOrigins = fromIds.length;
192+
}
193+
194+
public void setModes(String directModes, String accessModes, String transitModes, String egressModes) {
195+
this.directModes = Utils.setLegModes(directModes);
196+
this.accessModes = Utils.setLegModes(accessModes);
197+
this.egressModes = Utils.setLegModes(egressModes);
198+
this.transitModes = Utils.setTransitModes(transitModes);
199+
}
200+
201+
public void setDepartureDateTime(String departureDate, String departureTime) {
202+
this.departureDate = departureDate;
203+
this.departureTime = departureTime;
204+
}
205+
206+
public void setTripDuration(int maxWalkTime, int maxBikeTime, int maxCarTime, int maxTripDuration) {
207+
this.maxWalkTime = maxWalkTime;
208+
this.maxBikeTime = maxBikeTime;
209+
this.maxCarTime = maxCarTime;
210+
this.maxTripDuration = maxTripDuration;
211+
}
212+
213+
private BatchWithSeq tryRunProcess(AtomicInteger totalProcessed, int index) {
214+
BatchWithSeq results = null;
215+
try {
216+
long start = System.currentTimeMillis();
217+
results = runProcess(index);
218+
LOG.debug("Processed origin {} in {}ms.", nOrigins, max(System.currentTimeMillis() - start, 0L));
219+
220+
221+
// TODO Add saving to file
222+
// if (Utils.saveOutputToCsv & results != null) {
223+
// String filename = getCsvFilename(index);
224+
// results.saveToCsv(filename);
225+
// results.clear();
226+
// }
227+
228+
LOG.info("{} out of {} origins processed.", totalProcessed.getAndIncrement(), nOrigins);
229+
230+
} catch (ParseException e) {
231+
LOG.error(String.valueOf(e));
232+
}
233+
234+
// return Utils.saveOutputToCsv ? null : results;
235+
return results;
236+
}
237+
238+
private String getCsvFilename(int index) {
239+
String filename;
240+
if (this.isOneToOne()) {
241+
// one-to-one functions, such as detailed itineraries
242+
// save one file per origin-destination pair
243+
filename = Utils.outputCsvFolder + "/from_" + fromIds[index] + "_to_" + toIds[index] + ".csv";
244+
} else {
245+
// one-to-many functions, such as travel time matrix
246+
// save one file per origin
247+
filename = Utils.outputCsvFolder + "/from_" + fromIds[index] + ".csv";
248+
}
249+
return filename;
250+
}
251+
252+
protected abstract BatchWithSeq runProcess(int index) throws ParseException;
253+
254+
protected RegionalTask buildRequest(int index) throws ParseException {
255+
RegionalTask request = new RegionalTask();
256+
257+
request.scenario = new Scenario();
258+
request.scenario.id = "id";
259+
request.scenarioId = request.scenario.id;
260+
261+
request.zoneId = transportNetwork.getTimeZone();
262+
request.fromLat = fromLats[index];
263+
request.fromLon = fromLons[index];
264+
request.walkSpeed = (float) routingProperties.walkSpeed;
265+
request.bikeSpeed = (float) routingProperties.bikeSpeed;
266+
request.streetTime = maxTripDuration;
267+
request.maxWalkTime = maxWalkTime;
268+
request.maxBikeTime = maxBikeTime;
269+
request.maxCarTime = maxCarTime;
270+
request.maxTripDurationMinutes = maxTripDuration;
271+
request.makeTauiSite = false;
272+
request.recordTimes = true;
273+
request.recordAccessibility = false;
274+
request.maxRides = routingProperties.maxRides;
275+
request.bikeTrafficStress = routingProperties.maxLevelTrafficStress;
276+
277+
request.directModes = directModes;
278+
request.accessModes = accessModes;
279+
request.egressModes = egressModes;
280+
request.transitModes = transitModes;
281+
282+
request.date = LocalDate.parse(departureDate);
283+
284+
secondsFromMidnight = Utils.getSecondsFromMidnight(departureTime);
285+
286+
request.fromTime = secondsFromMidnight;
287+
request.toTime = secondsFromMidnight + (routingProperties.timeWindowSize * 60) ;
288+
289+
request.monteCarloDraws = routingProperties.numberOfMonteCarloDraws;
290+
request.suboptimalMinutes = routingProperties.suboptimalMinutes;
291+
292+
request.percentiles = routingProperties.percentiles;
293+
294+
request.inRoutingFareCalculator = routingProperties.fareCalculator;
295+
request.maxFare = Math.round(routingProperties.maxFare * 100.0f);
296+
297+
return request;
298+
}
299+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package org.ipea.r5r.Process;
2+
3+
import org.apache.arrow.memory.BufferAllocator;
4+
import org.apache.arrow.vector.VectorSchemaRoot;
5+
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
6+
import org.apache.arrow.vector.types.pojo.Schema;
7+
8+
final class Collector implements Runnable {
9+
private final BufferAllocator parentAlloc;
10+
private final Schema schema;
11+
private final java.util.concurrent.BlockingQueue<BatchWithSeq> in;
12+
private final java.nio.channels.WritableByteChannel channel;
13+
private final int expectedBatches; // nOrigins
14+
private volatile boolean started = false;
15+
16+
Collector(BufferAllocator parentAlloc,
17+
Schema schema,
18+
java.util.concurrent.BlockingQueue<BatchWithSeq> in,
19+
java.nio.channels.WritableByteChannel channel,
20+
int expectedBatches) {
21+
this.parentAlloc = parentAlloc;
22+
this.schema = schema;
23+
this.in = in;
24+
this.channel = channel;
25+
this.expectedBatches = expectedBatches;
26+
}
27+
28+
@Override public void run() {
29+
started = true;
30+
try (BufferAllocator alloc = parentAlloc.newChildAllocator("collector", 0, Long.MAX_VALUE);
31+
VectorSchemaRoot writerRoot = VectorSchemaRoot.create(schema, alloc);
32+
org.apache.arrow.vector.ipc.ArrowStreamWriter writer =
33+
new org.apache.arrow.vector.ipc.ArrowStreamWriter(writerRoot, null, channel)) {
34+
35+
writer.start();
36+
org.apache.arrow.vector.VectorLoader loader = new org.apache.arrow.vector.VectorLoader(writerRoot);
37+
38+
int written = 0;
39+
int nextSeq = 0;
40+
java.util.Map<Integer, BatchWithSeq> buffer = new java.util.HashMap<>();
41+
42+
while (written < expectedBatches) {
43+
BatchWithSeq item = in.poll(250, java.util.concurrent.TimeUnit.MILLISECONDS);
44+
if (item != null) {
45+
buffer.put(item.seq, item);
46+
// drain in order if possible
47+
while (buffer.containsKey(nextSeq)) {
48+
try (ArrowRecordBatch b = buffer.remove(nextSeq).batch) {
49+
loader.load(b);
50+
writer.writeBatch();
51+
written++;
52+
nextSeq++;
53+
}
54+
}
55+
}
56+
}
57+
58+
writer.end();
59+
} catch (InterruptedException ie) {
60+
Thread.currentThread().interrupt();
61+
throw new RuntimeException("Collector interrupted", ie);
62+
} catch (Exception e) {
63+
throw new RuntimeException("Collector failure", e);
64+
}
65+
}
66+
67+
public boolean isStarted() { return started; }
68+
}
69+

0 commit comments

Comments
 (0)