Skip to content

Commit fb27629

Browse files
committed
Merge pull request #2 from civitaspo/v0.0.2
v0.0.2
2 parents c86fa81 + d8920cb commit fb27629

File tree

6 files changed

+332
-56
lines changed

6 files changed

+332
-56
lines changed

CHENGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
0.0.2 (2016-01-23)
2+
==================
3+
- incompatible change: option `column_name` is removed
4+
- add: option `column`
5+
- add: option `skip_if_null`
6+
- enhance: support json type
7+
18
0.0.1 (2016-01-22)
29
==================
310
- first release.

README.md

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,20 @@ Convert a record to jsonl.
88

99
## Configuration
1010

11-
- **column_name**: json column name (string, default: `"json_payload"`)
11+
- **column**: output json column (optional)
12+
- **name** (string, default: `"json_payload"`)
13+
- **type** string or json (string, default: `"string"`)
14+
- **skip_if_null**: input column name list (array of string, default: `[]`)
1215

1316
## Example
1417

1518
```yaml
1619
filters:
1720
- type: to_json
18-
column_name: json_column
21+
column:
22+
name: test
23+
type: string
24+
skip_if_null: [id]
1925
```
2026
2127
## Run Example
@@ -25,9 +31,6 @@ $ ./gradlew classpath
2531
$ embulk run -I lib example/config.yml
2632
```
2733

28-
## TODO
29-
- support json type
30-
3134
## Build
3235

3336
```

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ configurations {
1313
provided
1414
}
1515

16-
version = "0.0.1"
16+
version = "0.0.2"
1717

1818
sourceCompatibility = 1.7
1919
targetCompatibility = 1.7

example/config.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,9 @@ in:
1515
- {name: score, type: double}
1616
filters:
1717
- type: to_json
18+
column:
19+
name: test
20+
type: string
21+
skip_if_null: [id]
1822
out:
1923
type: stdout
Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
package org.embulk.filter.to_json;
2+
3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import com.google.common.base.Optional;
6+
import com.google.common.collect.Lists;
7+
import com.google.common.collect.Maps;
8+
import org.embulk.config.ConfigException;
9+
import org.embulk.spi.Column;
10+
import org.embulk.spi.ColumnVisitor;
11+
import org.embulk.spi.PageBuilder;
12+
import org.embulk.spi.PageReader;
13+
import org.embulk.spi.Schema;
14+
import org.embulk.spi.json.JsonParser;
15+
import org.embulk.spi.type.Types;
16+
17+
import java.io.IOException;
18+
import java.util.List;
19+
import java.util.Map;
20+
21+
/**
22+
* Created by takahiro.nakayama on 1/23/16.
23+
*/
24+
public class ColumnVisitorToJsonImpl
25+
implements ColumnVisitor
26+
{
27+
private static class ColumnSetter
28+
{
29+
private final PageBuilder pageBuilder;
30+
private final JsonStringSetter jsonStringSetter;
31+
private final JsonParser jsonParser = new JsonParser();
32+
33+
interface JsonStringSetter
34+
{
35+
void set(String json);
36+
37+
void set(Column column, String json);
38+
39+
void set(int index, String json);
40+
}
41+
42+
public ColumnSetter(final PageBuilder pageBuilder, final Column outputColumn)
43+
{
44+
this.pageBuilder = pageBuilder;
45+
46+
if (Types.STRING.equals(outputColumn.getType())) {
47+
this.jsonStringSetter = new JsonStringSetter()
48+
{
49+
@Override
50+
public void set(String json)
51+
{
52+
set(outputColumn, json);
53+
}
54+
55+
@Override
56+
public void set(Column column, String json)
57+
{
58+
set(column.getIndex(), json);
59+
}
60+
61+
@Override
62+
public void set(int index, String json)
63+
{
64+
setAsString(index, json);
65+
}
66+
67+
private void setAsString(int index, String json)
68+
{
69+
pageBuilder.setString(index, json);
70+
}
71+
};
72+
}
73+
else if (Types.JSON.equals(outputColumn.getType())) {
74+
this.jsonStringSetter = new JsonStringSetter()
75+
{
76+
@Override
77+
public void set(String json)
78+
{
79+
set(outputColumn, json);
80+
}
81+
82+
@Override
83+
public void set(Column column, String json)
84+
{
85+
set(column.getIndex(), json);
86+
}
87+
88+
@Override
89+
public void set(int index, String json)
90+
{
91+
setAsJson(index, json);
92+
}
93+
94+
private void setAsJson(int index, String json)
95+
{
96+
pageBuilder.setJson(index, jsonParser.parse(json));
97+
}
98+
};
99+
}
100+
else {
101+
throw new ConfigException(String.format("Cannot convert JSON to type: %s", outputColumn.getType()));
102+
}
103+
}
104+
}
105+
106+
private static final ObjectMapper objectMapper = new ObjectMapper();
107+
private final Map<String, Object> map;
108+
private final PageReader pageReader;
109+
private final ColumnSetter columnSetter;
110+
private List<String> skipColumnsIfNull = Lists.newArrayList();
111+
private boolean skipRecordFlag = false;
112+
113+
ColumnVisitorToJsonImpl(PageReader pageReader, PageBuilder pageBuilder,
114+
Column outputColumn, List<String> skipColumnsIfNull)
115+
{
116+
this.map = Maps.newHashMap();
117+
this.pageReader = pageReader;
118+
this.columnSetter = new ColumnSetter(pageBuilder, outputColumn);
119+
this.skipColumnsIfNull = skipColumnsIfNull;
120+
}
121+
122+
@Override
123+
public void booleanColumn(Column column)
124+
{
125+
if (pageReader.isNull(column)) {
126+
setSkipRecordFlagIfNeeded(column);
127+
putNull(column);
128+
return;
129+
}
130+
map.put(column.getName(), pageReader.getBoolean(column));
131+
}
132+
133+
@Override
134+
public void longColumn(Column column)
135+
{
136+
if (pageReader.isNull(column)) {
137+
setSkipRecordFlagIfNeeded(column);
138+
putNull(column);
139+
return;
140+
}
141+
map.put(column.getName(), pageReader.getLong(column));
142+
}
143+
144+
@Override
145+
public void doubleColumn(Column column)
146+
{
147+
if (pageReader.isNull(column)) {
148+
setSkipRecordFlagIfNeeded(column);
149+
putNull(column);
150+
return;
151+
}
152+
map.put(column.getName(), pageReader.getDouble(column));
153+
}
154+
155+
@Override
156+
public void stringColumn(Column column)
157+
{
158+
if (pageReader.isNull(column)) {
159+
setSkipRecordFlagIfNeeded(column);
160+
putNull(column);
161+
return;
162+
}
163+
map.put(column.getName(), pageReader.getString(column));
164+
}
165+
166+
@Override
167+
public void timestampColumn(Column column)
168+
{
169+
if (pageReader.isNull(column)) {
170+
setSkipRecordFlagIfNeeded(column);
171+
putNull(column);
172+
return;
173+
}
174+
map.put(column.getName(), pageReader.getTimestamp(column).toString());
175+
}
176+
177+
@Override
178+
public void jsonColumn(Column column)
179+
{
180+
if (pageReader.isNull(column)) {
181+
setSkipRecordFlagIfNeeded(column);
182+
putNull(column);
183+
return;
184+
}
185+
186+
try {
187+
map.put(column.getName(), objectMapper.readTree(pageReader.getJson(column).toJson()));
188+
}
189+
catch (IOException e) {
190+
throw new RuntimeException(e);
191+
}
192+
}
193+
194+
private void putNull(Column column)
195+
{
196+
map.put(column.getName(), null);
197+
}
198+
199+
private void clear()
200+
{
201+
skipRecordFlag = false;
202+
map.clear();
203+
}
204+
205+
private void setSkipRecordFlagIfNeeded(Column column)
206+
{
207+
if (skipColumnsIfNull.contains(column.getName())) {
208+
skipRecordFlag = true;
209+
}
210+
}
211+
212+
private boolean shouldSkipRecord()
213+
{
214+
return skipRecordFlag;
215+
}
216+
217+
public void visit()
218+
{
219+
pageReader.getSchema().visitColumns(this);
220+
221+
try {
222+
if (!shouldSkipRecord()) {
223+
columnSetter.jsonStringSetter.set(buildJsonString());
224+
}
225+
}
226+
finally {
227+
clear();
228+
}
229+
}
230+
231+
private String buildJsonString()
232+
{
233+
try {
234+
return objectMapper.writeValueAsString(map);
235+
}
236+
catch (JsonProcessingException e) {
237+
throw new RuntimeException(e);
238+
}
239+
}
240+
}

0 commit comments

Comments
 (0)