embulk の resume-state と config-diff の検証結果
embulk v0.9.7 の –resume-state と –config-diff の検証結果です。
–resume-state と –config-diff が使えるバージョンであれば、0.9.7 以外のバージョンでも同様の結果になると思います。
docker image は https://hub.docker.com/r/tkuchiki/embulk/ を使いました。
resume-state⌗
読み込む CSV を用意します。失敗したものだけ読み込み直せるか確認したいので、一つだけ invalid な CSV にします。
$ mkdir in out
$ echo "1,2" > in/1_valid.csv
$ echo "3 4" > in/2_invalid.csv
$ echo "5,6" > in/3_valid.csv
以下のような設定ファイルを用意します。
$ cat config.yml
in:
path_prefix: /opt/embulk/in/
type: file
parser:
charset: UTF-8
newline: LF
type: csv
delimiter: ','
quote: '"'
escape: '"'
trim_if_not_quoted: false
skip_header_lines: 0
allow_extra_columns: false
allow_optional_columns: false
stop_on_invalid_record: true
columns:
- {name: c0, type: long}
- {name: c1, type: long}
out:
type: stdout
準備ができたので embulk を実行します。
$ docker run --rm -v $(pwd)/config.yml:/tmp/config.yml -v $(pwd)/in:/opt/embulk/in -v $(pwd)/out:/opt/embulk/out -t tkuchiki/embulk run /tmp/config.yml --resume-state /opt/embulk/out/resume-state.yml
2018-08-09 09:14:50.084 +0000: Embulk v0.9.7
Gem plugin path is: /root/.embulk/lib/gems
Fetching: embulk-parser-jsonl-0.2.0.gem (100%)
Successfully installed embulk-parser-jsonl-0.2.0
1 gem installed
2018-08-09 09:15:00.100 +0000: Embulk v0.9.7
2018-08-09 09:15:01.328 +0000 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
2018-08-09 09:15:04.917 +0000 [INFO] (main): Gem's home and path are set by default: "/root/.embulk/lib/gems"
2018-08-09 09:15:06.031 +0000 [INFO] (main): Started Embulk v0.9.7
2018-08-09 09:15:06.097 +0000 [INFO] (0001:transaction): Listing local files at directory '/opt/embulk/in' filtering filename by prefix ''
2018-08-09 09:15:06.098 +0000 [INFO] (0001:transaction): "follow_symlinks" is set false. Note that symbolic links to directories are skipped.
2018-08-09 09:15:06.115 +0000 [INFO] (0001:transaction): Loading files [/opt/embulk/in/1_valid.csv, /opt/embulk/in/2_invalid.csv, /opt/embulk/in/3_valid.csv]
2018-08-09 09:15:06.172 +0000 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 6 = input tasks 3 * 2
2018-08-09 09:15:06.185 +0000 [INFO] (0001:transaction): {done: 0 / 3, running: 0}
1,2
5,6
2018-08-09 09:15:06.334 +0000 [INFO] (0001:transaction): {done: 3 / 3, running: 0}
2018-08-09 09:15:06.335 +0000 [INFO] (0001:transaction): {done: 3 / 3, running: 0}
2018-08-09 09:15:06.335 +0000 [INFO] (0001:transaction): {done: 3 / 3, running: 0}
2018-08-09 09:15:06.339 +0000 [INFO] (main): Writing resume state to '/opt/embulk/out/resume-state.yml'
2018-08-09 09:15:06.393 +0000 [INFO] (main): Resume state is written. Run the transaction again with -r option to resume or use "cleanup" subcommand to delete intermediate data.
java.lang.RuntimeException: org.embulk.spi.DataException: Invalid record at line 1: 3 4
at org.embulk.EmbulkRunner.runInternal(EmbulkRunner.java:317)
at org.embulk.EmbulkRunner.run(EmbulkRunner.java:156)
at org.embulk.cli.EmbulkRun.runSubcommand(EmbulkRun.java:436)
at org.embulk.cli.EmbulkRun.run(EmbulkRun.java:91)
at org.embulk.cli.Main.main(Main.java:26)
Caused by: org.embulk.spi.DataException: Invalid record at line 1: 3 4
at org.embulk.standards.CsvParserPlugin.run(CsvParserPlugin.java:363)
at org.embulk.spi.FileInputRunner.run(FileInputRunner.java:140)
at org.embulk.exec.LocalExecutorPlugin$ScatterExecutor.runInputTask(LocalExecutorPlugin.java:271)
at org.embulk.exec.LocalExecutorPlugin$ScatterExecutor.access$000(LocalExecutorPlugin.java:196)
at org.embulk.exec.LocalExecutorPlugin$ScatterExecutor$1.call(LocalExecutorPlugin.java:235)
at org.embulk.exec.LocalExecutorPlugin$ScatterExecutor$1.call(LocalExecutorPlugin.java:232)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.embulk.standards.CsvParserPlugin$CsvRecordValidateException: java.lang.NumberFormatException: For input string: "3 4"
at org.embulk.standards.CsvParserPlugin$1.longColumn(CsvParserPlugin.java:280)
at org.embulk.spi.Column.visit(Column.java:48)
at org.embulk.spi.Schema.visitColumns(Schema.java:68)
at org.embulk.standards.CsvParserPlugin.run(CsvParserPlugin.java:261)
... 9 more
Caused by: java.lang.NumberFormatException: For input string: "3 4"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Long.parseLong(Long.java:589)
at java.lang.Long.parseLong(Long.java:631)
at org.embulk.standards.CsvParserPlugin$1.longColumn(CsvParserPlugin.java:277)
... 12 more
Error: org.embulk.spi.DataException: Invalid record at line 1: 3 4
in/2_invalid.csv が文字通り invalid な CSV なので読み込みに失敗しました(ほかの2つは成功しています)。
失敗したので以下のような resume-state ファイルが出力されています。
$ cat out/resume-state.yml
exec_task: {transaction_time: '2018-08-09 09:15:06.033 UTC'}
in_task:
DecoderTaskSources: []
DecoderConfigs: []
ParserTaskSource:
SchemaConfig:
- {name: c0, type: long}
- {name: c1, type: long}
Delimiter: ','
QuotesInQuotedFields: ACCEPT_ONLY_RFC4180_ESCAPED
Charset: UTF-8
SkipHeaderLines: 0
Newline: LF
DefaultTimeZoneId: UTC
HeaderLine: null
StopOnInvalidRecord: true
MaxQuotedSizeLimit: 131072
DefaultTimestampFormat: '%Y-%m-%d %H:%M:%S.%N %z'
CommentLineMarker: null
TrimIfNotQuoted: false
QuoteChar: '"'
AllowOptionalColumns: false
NullString: null
DefaultDate: '1970-01-01'
AllowExtraColumns: false
EscapeChar: '"'
FileInputTaskSource:
LastPath: null
FollowSymlinks: false
PathPrefix: /opt/embulk/in/
Files: [/opt/embulk/in/1_valid.csv, /opt/embulk/in/2_invalid.csv, /opt/embulk/in/3_valid.csv]
ParserConfig:
charset: UTF-8
newline: LF
type: csv
delimiter: ','
quote: '"'
escape: '"'
trim_if_not_quoted: false
skip_header_lines: 0
allow_extra_columns: false
allow_optional_columns: false
stop_on_invalid_record: true
columns:
- {name: c0, type: long}
- {name: c1, type: long}
out_task: {PrintsColumnNames: false, TimeZoneId: UTC}
in_schema:
- {index: 0, name: c0, type: long}
- {index: 1, name: c1, type: long}
out_schema:
- {index: 0, name: c0, type: long}
- {index: 1, name: c1, type: long}
in_reports:
- {}
- null
- {}
out_reports:
- {}
- {}
- null
- null
- {}
- {}
Invalid な CSV を valid な CSV に書き換えて読み込み直します。
$ echo "3,4" > in/2_invalid.csv
$ docker run --rm -v $(pwd)/config.yml:/tmp/config.yml -v $(pwd)/in:/opt/embulk/in -v $(pwd)/out:/opt/embulk/out -t tkuchiki/embulk run /tmp/config.yml --resume-state /opt/embulk/out/resume-state.yml
2018-08-09 09:16:15.729 +0000: Embulk v0.9.7
2018-08-09 09:16:17.284 +0000 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
2018-08-09 09:16:21.046 +0000 [INFO] (main): Gem's home and path are set by default: "/root/.embulk/lib/gems"
2018-08-09 09:16:22.230 +0000 [INFO] (main): Started Embulk v0.9.7
2018-08-09 09:16:22.345 +0000 [INFO] (0001:resume): Using local thread executor with max_threads=8 / output tasks 6 = input tasks 3 * 2
2018-08-09 09:16:22.351 +0000 [WARN] (0001:resume): Skipped resumed input task 0
2018-08-09 09:16:22.352 +0000 [WARN] (0001:resume): Skipped resumed input task 2
2018-08-09 09:16:22.352 +0000 [INFO] (0001:resume): {done: 2 / 3, running: 0}
3,4
2018-08-09 09:16:22.438 +0000 [INFO] (0001:resume): {done: 2 / 3, running: 1}
2018-08-09 09:16:22.458 +0000 [INFO] (main): Committed.
2018-08-09 09:16:22.459 +0000 [INFO] (main): Next config diff: {"in":{"last_path":"/opt/embulk/in/3_valid.csv"},"out":{}}
成功しました。先程成功したファイルは表示されていないことがわかります。
成功すると resume-state ファイルは削除されます。
config-diff⌗
config.yml は resume-state と同じものを使います。また、 in / out ディレクトリ以下のファイルはすべて削除します。
準備ができたら CSV ファイルを一つ作って embulk を実行します。
$ echo "1,2" > in/1.csv
$ docker run --rm -v $(pwd)/config.yml:/tmp/config.yml -v $(pwd)/in:/opt/embulk/in -v $(pwd)/out:/opt/embulk/out -t tkuchiki/embulk run /tmp/config.yml -c /opt/embulk/out/diff.yml
2018-08-09 09:33:14.704 +0000: Embulk v0.9.7
2018-08-09 09:33:16.178 +0000 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
2018-08-09 09:33:20.000 +0000 [INFO] (main): Gem's home and path are set by default: "/root/.embulk/lib/gems"
2018-08-09 09:33:21.471 +0000 [INFO] (main): Started Embulk v0.9.7
2018-08-09 09:33:21.567 +0000 [INFO] (0001:transaction): Listing local files at directory '/opt/embulk/in' filtering filename by prefix ''
2018-08-09 09:33:21.571 +0000 [INFO] (0001:transaction): "follow_symlinks" is set false. Note that symbolic links to directories are skipped.
2018-08-09 09:33:21.591 +0000 [INFO] (0001:transaction): Loading files [/opt/embulk/in/1.csv]
2018-08-09 09:33:21.647 +0000 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4
2018-08-09 09:33:21.668 +0000 [INFO] (0001:transaction): {done: 0 / 1, running: 0}
1,2
2018-08-09 09:33:21.790 +0000 [INFO] (0001:transaction): {done: 1 / 1, running: 0}
2018-08-09 09:33:21.803 +0000 [INFO] (main): Committed.
2018-08-09 09:33:21.803 +0000 [INFO] (main): Next config diff: {"in":{"last_path":"/opt/embulk/in/1.csv"},"out":{}}
diff ファイルを確認すると、読み込んだファイルが記録されています。
$ cat out/diff.yml
in: {last_path: /opt/embulk/in/1.csv}
out: {}
CSV ファイルを一つ追加して embulk を実行します。
```console
$ echo "3,4" > in/2.csv
$ docker run --rm -v $(pwd)/config.yml:/tmp/config.yml -v $(pwd)/in:/opt/embulk/in -v $(pwd)/out:/opt/embulk/out -t tkuchiki/embulk run /tmp/config.yml -c /opt/embulk/out/diff.yml
2018-08-09 09:35:33.941 +0000: Embulk v0.9.7
2018-08-09 09:35:35.573 +0000 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
2018-08-09 09:35:39.302 +0000 [INFO] (main): Gem's home and path are set by default: "/root/.embulk/lib/gems"
2018-08-09 09:35:40.560 +0000 [INFO] (main): Started Embulk v0.9.7
2018-08-09 09:35:40.625 +0000 [INFO] (0001:transaction): Listing local files at directory '/opt/embulk/in' filtering filename by prefix ''
2018-08-09 09:35:40.627 +0000 [INFO] (0001:transaction): "follow_symlinks" is set false. Note that symbolic links to directories are skipped.
2018-08-09 09:35:40.644 +0000 [INFO] (0001:transaction): Loading files [/opt/embulk/in/2.csv]
2018-08-09 09:35:40.694 +0000 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4
2018-08-09 09:35:40.704 +0000 [INFO] (0001:transaction): {done: 0 / 1, running: 0}
3,4
2018-08-09 09:35:40.827 +0000 [INFO] (0001:transaction): {done: 1 / 1, running: 0}
2018-08-09 09:35:40.833 +0000 [INFO] (main): Committed.
2018-08-09 09:35:40.833 +0000 [INFO] (main): Next config diff: {"in":{"last_path":"/opt/embulk/in/2.csv"},"out":{}}
1回目の実行時に読み込んだ in/1.csv は処理されず、新しく追加された in/2.csv だけ処理されていることがわかります。
diff ファイルが更新されています。
$ cat out/diff.yml
in: {last_path: /opt/embulk/in/2.csv}
out: {}
結果は省略しますが、この状態で再度 embulk を実行すると何も処理されずに終了します。
resume-state と config-diff は plugin が対応していないと使用できないので注意してください。
embulk-output-bigquery は resume-state に非対応、config-diff は diff を出力しますが、ファイルを追加しても何も実行されませんでした。