Skip to content

Commit 628122f

Browse files
authored
add sync function for incremental update (#84)
* fishjam#1 add go.mod and fix build error(infini.sh/framework, fasthttp, util) * run following command to format code: gofmt -l -w . * fishjam#1 1.add Config: SortField, TruncateOutFile, SkipFields, so can dump es index to local file and compare. 2.add Config function: Sync , so can scroll and compare the source and dest index records, and just index/update/delete the changed records 3.refactor code: - add some functions in esapi, ClusterVersion() and DeleteScroll(), add ParseEsApi - move bulk.go to migrator.go, and add some functions - refactor all http method(GET/Post/DoRequest) to sinle Request method, and support proxy. - delete some commented and useless code * fix error while source index not exist and change log. * change model name * change log * update README.md and change the description of some configurations
1 parent a6a88ae commit 628122f

File tree

18 files changed

+1359
-1016
lines changed

18 files changed

+1359
-1016
lines changed

README.md

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ Links:
2323
* Support specify which _source fields to return from source
2424
* Support specify query string query to filter the data source
2525
* Support rename source fields while do bulk indexing
26+
* Support incremental update(add/update/delete changed records) with `--sync`. Notice: it use different implementation, just handle the ***changed*** records, but not as fast as the old way
2627
* Load generating with
2728

2829
## ESM is fast!
@@ -69,6 +70,11 @@ copy index `src_index` from `192.168.1.x` to `192.168.1.y:9200` and save with `d
6970
./bin/esm -s http://localhost:9200 -d http://localhost:9200 -x src_index -y dest_index -w=5 -b=100
7071
```
7172

73+
use sync feature for incremental update index `src_index` from `192.168.1.x` to `192.168.1.y:9200`
74+
```
75+
./bin/esm --sync -s http://localhost:9200 -d http://localhost:9200 -x src_index -y dest_index
76+
```
77+
7278
support Basic-Auth
7379
```
7480
./bin/esm -s http://localhost:9200 -x "src_index" -y "dest_index" -d http://localhost:9201 -n admin:111111
@@ -91,6 +97,13 @@ dump elasticsearch documents into local file
9197
./bin/esm -s http://localhost:9200 -x "src_index" -m admin:111111 -c 5000 -q=query:mixer --refresh -o=dump.bin
9298
```
9399

100+
dump source and target index to local file and compare them, so can find the difference quickly
101+
```
102+
./bin/esm --sort=_id -s http://localhost:9200 -x "src_index" --truncate_output --skip=_index -o=src.json
103+
./bin/esm --sort=_id -s http://localhost:9200 -x "dst_index" --truncate_output --skip=_index -o=dst.json
104+
diff -W 200 -ry --suppress-common-lines src.json dst.json
105+
```
106+
94107
loading data from dump files, bulk insert to another es instance
95108
```
96109
./bin/esm -d http://localhost:9200 -y "dest_index" -n admin:111111 -c 5000 -b 5 --refresh -i=dump.bin
@@ -172,6 +185,7 @@ Usage:
172185
Application Options:
173186
-s, --source= source elasticsearch instance, ie: http://localhost:9200
174187
-q, --query= query against source elasticsearch instance, filter data before migrate, ie: name:medcl
188+
--sort= sort field when scroll, ie: _id (default: _id)
175189
-d, --dest= destination elasticsearch instance, ie: http://localhost:9201
176190
-m, --source_auth= basic auth of source elasticsearch instance, ie: user:pass
177191
-n, --dest_auth= basic auth of target elasticsearch instance, ie: user:pass
@@ -192,12 +206,15 @@ Application Options:
192206
--green wait for both hosts cluster status to be green before dump. otherwise yellow is okay
193207
-v, --log= setting log level,options:trace,debug,info,warn,error (INFO)
194208
-o, --output_file= output documents of source index into local file
209+
--truncate_output= truncate before dump to output file
195210
-i, --input_file= indexing from local dump file
196211
--input_file_type= the data type of input file, options: dump, json_line, json_array, log_line (dump)
197212
--source_proxy= set proxy to source http connections, ie: http://127.0.0.1:8080
198213
--dest_proxy= set proxy to target http connections, ie: http://127.0.0.1:8080
199214
--refresh refresh after migration finished
200-
--fields= filter source fields, comma separated, ie: col1,col2,col3,...
215+
--sync= sync will use scroll for both source and target index, compare the data and sync(index/update/delete)
216+
--fields= filter source fields(white list), comma separated, ie: col1,col2,col3,...
217+
--skip= skip source fields(black list), comma separated, ie: col1,col2,col3,...
201218
--rename= rename source fields, comma separated, ie: _type:type, name:myname
202219
-l, --logstash_endpoint= target logstash tcp endpoint, ie: 127.0.0.1:5055
203220
--secured_logstash_endpoint target logstash tcp endpoint was secured by TLS

buffer.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ limitations under the License.
1717
package main
1818

1919
import (
20-
"io"
2120
"errors"
21+
"io"
2222
)
2323

2424
//https://golangtc.com/t/5a49e2104ce40d740bbbc515
@@ -39,40 +39,40 @@ func (b *buffer) Len() int {
3939
return b.end - b.start
4040
}
4141

42-
//将有用的字节前移
42+
// 将有用的字节前移
4343
func (b *buffer) grow() {
4444
if b.start == 0 {
4545
return
4646
}
4747
copy(b.buf, b.buf[b.start:b.end])
4848
b.end -= b.start
49-
b.start = 0;
49+
b.start = 0
5050
}
5151

52-
//从reader里面读取数据,如果reader阻塞,会发生阻塞
52+
// 从reader里面读取数据,如果reader阻塞,会发生阻塞
5353
func (b *buffer) readFromReader() (int, error) {
5454
b.grow()
5555
n, err := b.reader.Read(b.buf[b.end:])
56-
if (err != nil) {
56+
if err != nil {
5757
return n, err
5858
}
5959
b.end += n
6060
return n, nil
6161
}
6262

63-
//返回n个字节,而不产生移位
63+
// 返回n个字节,而不产生移位
6464
func (b *buffer) seek(n int) ([]byte, error) {
6565
if b.end-b.start >= n {
66-
buf := b.buf[b.start:b.start+n]
66+
buf := b.buf[b.start : b.start+n]
6767
return buf, nil
6868
}
6969
return nil, errors.New("not enough")
7070
}
7171

72-
//舍弃offset个字段,读取n个字段
73-
func (b *buffer) read(offset, n int) ([]byte) {
72+
// 舍弃offset个字段,读取n个字段
73+
func (b *buffer) read(offset, n int) []byte {
7474
b.start += offset
75-
buf := b.buf[b.start:b.start+n]
75+
buf := b.buf[b.start : b.start+n]
7676
b.start += n
7777
return buf
7878
}

bulk.go

Lines changed: 0 additions & 183 deletions
This file was deleted.

domain.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ type ClusterHealth struct {
7777
Status string `json:"status,omitempty"`
7878
}
7979

80-
//{"took":23,"errors":true,"items":[{"create":{"_index":"mybank3","_type":"my_doc2","_id":"AWz8rlgUkzP-cujdA_Fv","status":409,"error":{"type":"version_conflict_engine_exception","reason":"[AWz8rlgUkzP-cujdA_Fv]: version conflict, document already exists (current version [1])","index_uuid":"w9JZbJkfSEWBI-uluWorgw","shard":"0","index":"mybank3"}}},{"create":{"_index":"mybank3","_type":"my_doc4","_id":"AWz8rpF2kzP-cujdA_Fx","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, my_doc4]"}}},{"create":{"_index":"mybank3","_type":"my_doc1","_id":"AWz8rjpJkzP-cujdA_Fu","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, my_doc1]"}}},{"create":{"_index":"mybank3","_type":"my_doc3","_id":"AWz8rnbckzP-cujdA_Fw","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, my_doc3]"}}},{"create":{"_index":"mybank3","_type":"my_doc5","_id":"AWz8rrsEkzP-cujdA_Fy","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, my_doc5]"}}},{"create":{"_index":"mybank3","_type":"doc","_id":"3","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, doc]"}}}]}
80+
// {"took":23,"errors":true,"items":[{"create":{"_index":"mybank3","_type":"my_doc2","_id":"AWz8rlgUkzP-cujdA_Fv","status":409,"error":{"type":"version_conflict_engine_exception","reason":"[AWz8rlgUkzP-cujdA_Fv]: version conflict, document already exists (current version [1])","index_uuid":"w9JZbJkfSEWBI-uluWorgw","shard":"0","index":"mybank3"}}},{"create":{"_index":"mybank3","_type":"my_doc4","_id":"AWz8rpF2kzP-cujdA_Fx","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, my_doc4]"}}},{"create":{"_index":"mybank3","_type":"my_doc1","_id":"AWz8rjpJkzP-cujdA_Fu","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, my_doc1]"}}},{"create":{"_index":"mybank3","_type":"my_doc3","_id":"AWz8rnbckzP-cujdA_Fw","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, my_doc3]"}}},{"create":{"_index":"mybank3","_type":"my_doc5","_id":"AWz8rrsEkzP-cujdA_Fy","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, my_doc5]"}}},{"create":{"_index":"mybank3","_type":"doc","_id":"3","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [mybank3] as the final mapping would have more than 1 type: [my_doc2, doc]"}}}]}
8181
type BulkResponse struct {
8282
Took int `json:"took,omitempty"`
8383
Errors bool `json:"errors,omitempty"`
@@ -107,11 +107,12 @@ type Config struct {
107107
// config options
108108
SourceEs string `short:"s" long:"source" description:"source elasticsearch instance, ie: http://localhost:9200"`
109109
Query string `short:"q" long:"query" description:"query against source elasticsearch instance, filter data before migrate, ie: name:medcl"`
110+
SortField string `long:"sort" description:"sort field when scroll, ie: _id" default:"_id"`
110111
TargetEs string `short:"d" long:"dest" description:"destination elasticsearch instance, ie: http://localhost:9201"`
111112
SourceEsAuthStr string `short:"m" long:"source_auth" description:"basic auth of source elasticsearch instance, ie: user:pass"`
112113
TargetEsAuthStr string `short:"n" long:"dest_auth" description:"basic auth of target elasticsearch instance, ie: user:pass"`
113114
DocBufferCount int `short:"c" long:"count" description:"number of documents at a time: ie \"size\" in the scroll request" default:"10000"`
114-
BufferCount int `long:"buffer_count" description:"number of buffered documents in memory" default:"1000000"`
115+
BufferCount int `long:"buffer_count" description:"number of buffered documents in memory" default:"1000000"`
115116
Workers int `short:"w" long:"workers" description:"concurrency number for bulk workers" default:"1"`
116117
BulkSizeInMB int `short:"b" long:"bulk_size" description:"bulk size in MB" default:"5"`
117118
ScrollTime string `short:"t" long:"time" description:"scroll time" default:"10m"`
@@ -127,12 +128,15 @@ type Config struct {
127128
WaitForGreen bool `long:"green" description:"wait for both hosts cluster status to be green before dump. otherwise yellow is okay"`
128129
LogLevel string `short:"v" long:"log" description:"setting log level,options:trace,debug,info,warn,error" default:"INFO"`
129130
DumpOutFile string `short:"o" long:"output_file" description:"output documents of source index into local file" `
131+
TruncateOutFile bool `long:"truncate_output" description:"truncate before dump to output file" `
130132
DumpInputFile string `short:"i" long:"input_file" description:"indexing from local dump file" `
131133
InputFileType string `long:"input_file_type" description:"the data type of input file, options: dump, json_line, json_array, log_line" default:"dump" `
132134
SourceProxy string `long:"source_proxy" description:"set proxy to source http connections, ie: http://127.0.0.1:8080"`
133135
TargetProxy string `long:"dest_proxy" description:"set proxy to target http connections, ie: http://127.0.0.1:8080"`
134136
Refresh bool `long:"refresh" description:"refresh after migration finished"`
135-
Fields string `long:"fields" description:"filter source fields, comma separated, ie: col1,col2,col3,..." `
137+
Sync bool `long:"sync" description:"sync will use scroll for both source and target index, compare the data and sync(index/update/delete)"`
138+
Fields string `long:"fields" description:"filter source fields(white list), comma separated, ie: col1,col2,col3,..." `
139+
SkipFields string `long:"skip" description:"skip source fields(black list), comma separated, ie: col1,col2,col3,..." `
136140
RenameFields string `long:"rename" description:"rename source fields, comma separated, ie: _type:type, name:myname" `
137141
LogstashEndpoint string `short:"l" long:"logstash_endpoint" description:"target logstash tcp endpoint, ie: 127.0.0.1:5055" `
138142
LogstashSecEndpoint bool `long:"secured_logstash_endpoint" description:"target logstash tcp endpoint was secured by TLS" `

esapi.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,19 @@ package main
1818

1919
import "bytes"
2020

21-
type ESAPI interface{
21+
type ESAPI interface {
2222
ClusterHealth() *ClusterHealth
23-
Bulk(data *bytes.Buffer)
23+
ClusterVersion() *ClusterVersion
24+
Bulk(data *bytes.Buffer) error
2425
GetIndexSettings(indexNames string) (*Indexes, error)
25-
DeleteIndex(name string) (error)
26-
CreateIndex(name string,settings map[string]interface{}) (error)
27-
GetIndexMappings(copyAllIndexes bool,indexNames string)(string,int,*Indexes,error)
28-
UpdateIndexSettings(indexName string,settings map[string]interface{})(error)
29-
UpdateIndexMapping(indexName string,mappings map[string]interface{})(error)
30-
NewScroll(indexNames string,scrollTime string,docBufferCount int,query string, slicedId,maxSlicedCount int, fields string)(interface{}, error)
31-
NextScroll(scrollTime string,scrollId string)(interface{},error)
26+
DeleteIndex(name string) error
27+
CreateIndex(name string, settings map[string]interface{}) error
28+
GetIndexMappings(copyAllIndexes bool, indexNames string) (string, int, *Indexes, error)
29+
UpdateIndexSettings(indexName string, settings map[string]interface{}) error
30+
UpdateIndexMapping(indexName string, mappings map[string]interface{}) error
31+
NewScroll(indexNames string, scrollTime string, docBufferCount int, query string, sort string,
32+
slicedId int, maxSlicedCount int, fields string) (ScrollAPI, error)
33+
NextScroll(scrollTime string, scrollId string) (ScrollAPI, error)
34+
DeleteScroll(scrollId string) error
3235
Refresh(name string) (err error)
3336
}

0 commit comments

Comments
 (0)