embulk の resume-state と config-diff の検証結果

embulk v0.9.7 の –resume-state と –config-diff の検証結果です。
–resume-state と –config-diff が使えるバージョンであれば、0.9.7 以外のバージョンでも同様の結果になると思います。
docker image は https://hub.docker.com/r/tkuchiki/embulk/ を使いました。

resume-state

読み込む CSV を用意します。失敗したものだけ読み込み直せるか確認したいので、一つだけ invalid な CSV にします。

$ mkdir in out $ echo "1,2" > in/1_valid.csv $ echo "3 4" > in/2_invalid.csv $ echo "5,6" > in/3_valid.csv
Code language: PHP (php)

以下のような設定ファイルを用意します。

$ cat config.yml in: path_prefix: /opt/embulk/in/ type: file parser: charset: UTF-8 newline: LF type: csv delimiter: ',' quote: '"' escape: '"' trim_if_not_quoted: false skip_header_lines: 0 allow_extra_columns: false allow_optional_columns: false stop_on_invalid_record: true columns: - {name: c0, type: long} - {name: c1, type: long} out: type: stdout
Code language: JavaScript (javascript)

準備ができたので embulk を実行します。

$ docker run --rm -v $(pwd)/config.yml:/tmp/config.yml -v $(pwd)/in:/opt/embulk/in -v $(pwd)/out:/opt/embulk/out -t tkuchiki/embulk run /tmp/config.yml --resume-state /opt/embulk/out/resume-state.yml 2018-08-09 09:14:50.084 +0000: Embulk v0.9.7 Gem plugin path is: /root/.embulk/lib/gems Fetching: embulk-parser-jsonl-0.2.0.gem (100%) Successfully installed embulk-parser-jsonl-0.2.0 1 gem installed 2018-08-09 09:15:00.100 +0000: Embulk v0.9.7 2018-08-09 09:15:01.328 +0000 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected. 2018-08-09 09:15:04.917 +0000 [INFO] (main): Gem's home and path are set by default: "/root/.embulk/lib/gems" 2018-08-09 09:15:06.031 +0000 [INFO] (main): Started Embulk v0.9.7 2018-08-09 09:15:06.097 +0000 [INFO] (0001:transaction): Listing local files at directory '/opt/embulk/in' filtering filename by prefix '' 2018-08-09 09:15:06.098 +0000 [INFO] (0001:transaction): "follow_symlinks" is set false. Note that symbolic links to directories are skipped. 2018-08-09 09:15:06.115 +0000 [INFO] (0001:transaction): Loading files [/opt/embulk/in/1_valid.csv, /opt/embulk/in/2_invalid.csv, /opt/embulk/in/3_valid.csv] 2018-08-09 09:15:06.172 +0000 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 6 = input tasks 3 * 2 2018-08-09 09:15:06.185 +0000 [INFO] (0001:transaction): {done: 0 / 3, running: 0} 1,2 5,6 2018-08-09 09:15:06.334 +0000 [INFO] (0001:transaction): {done: 3 / 3, running: 0} 2018-08-09 09:15:06.335 +0000 [INFO] (0001:transaction): {done: 3 / 3, running: 0} 2018-08-09 09:15:06.335 +0000 [INFO] (0001:transaction): {done: 3 / 3, running: 0} 2018-08-09 09:15:06.339 +0000 [INFO] (main): Writing resume state to '/opt/embulk/out/resume-state.yml' 2018-08-09 09:15:06.393 +0000 [INFO] (main): Resume state is written. Run the transaction again with -r option to resume or use "cleanup" subcommand to delete intermediate data. java.lang.RuntimeException: org.embulk.spi.DataException: Invalid record at line 1: 3 4 at org.embulk.EmbulkRunner.runInternal(EmbulkRunner.java:317) at org.embulk.EmbulkRunner.run(EmbulkRunner.java:156) at org.embulk.cli.EmbulkRun.runSubcommand(EmbulkRun.java:436) at org.embulk.cli.EmbulkRun.run(EmbulkRun.java:91) at org.embulk.cli.Main.main(Main.java:26) Caused by: org.embulk.spi.DataException: Invalid record at line 1: 3 4 at org.embulk.standards.CsvParserPlugin.run(CsvParserPlugin.java:363) at org.embulk.spi.FileInputRunner.run(FileInputRunner.java:140) at org.embulk.exec.LocalExecutorPlugin$ScatterExecutor.runInputTask(LocalExecutorPlugin.java:271) at org.embulk.exec.LocalExecutorPlugin$ScatterExecutor.access$000(LocalExecutorPlugin.java:196) at org.embulk.exec.LocalExecutorPlugin$ScatterExecutor$1.call(LocalExecutorPlugin.java:235) at org.embulk.exec.LocalExecutorPlugin$ScatterExecutor$1.call(LocalExecutorPlugin.java:232) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.embulk.standards.CsvParserPlugin$CsvRecordValidateException: java.lang.NumberFormatException: For input string: "3 4" at org.embulk.standards.CsvParserPlugin$1.longColumn(CsvParserPlugin.java:280) at org.embulk.spi.Column.visit(Column.java:48) at org.embulk.spi.Schema.visitColumns(Schema.java:68) at org.embulk.standards.CsvParserPlugin.run(CsvParserPlugin.java:261) ... 9 more Caused by: java.lang.NumberFormatException: For input string: "3 4" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Long.parseLong(Long.java:589) at java.lang.Long.parseLong(Long.java:631) at org.embulk.standards.CsvParserPlugin$1.longColumn(CsvParserPlugin.java:277) ... 12 more Error: org.embulk.spi.DataException: Invalid record at line 1: 3 4
Code language: PHP (php)

in/2_invalid.csv が文字通り invalid な CSV なので読み込みに失敗しました(ほかの2つは成功しています)。
失敗したので以下のような resume-state ファイルが出力されています。

$ cat out/resume-state.yml exec_task: {transaction_time: '2018-08-09 09:15:06.033 UTC'} in_task: DecoderTaskSources: [] DecoderConfigs: [] ParserTaskSource: SchemaConfig: - {name: c0, type: long} - {name: c1, type: long} Delimiter: ',' QuotesInQuotedFields: ACCEPT_ONLY_RFC4180_ESCAPED Charset: UTF-8 SkipHeaderLines: 0 Newline: LF DefaultTimeZoneId: UTC HeaderLine: null StopOnInvalidRecord: true MaxQuotedSizeLimit: 131072 DefaultTimestampFormat: '%Y-%m-%d %H:%M:%S.%N %z' CommentLineMarker: null TrimIfNotQuoted: false QuoteChar: '"' AllowOptionalColumns: false NullString: null DefaultDate: '1970-01-01' AllowExtraColumns: false EscapeChar: '"' FileInputTaskSource: LastPath: null FollowSymlinks: false PathPrefix: /opt/embulk/in/ Files: [/opt/embulk/in/1_valid.csv, /opt/embulk/in/2_invalid.csv, /opt/embulk/in/3_valid.csv] ParserConfig: charset: UTF-8 newline: LF type: csv delimiter: ',' quote: '"' escape: '"' trim_if_not_quoted: false skip_header_lines: 0 allow_extra_columns: false allow_optional_columns: false stop_on_invalid_record: true columns: - {name: c0, type: long} - {name: c1, type: long} out_task: {PrintsColumnNames: false, TimeZoneId: UTC} in_schema: - {index: 0, name: c0, type: long} - {index: 1, name: c1, type: long} out_schema: - {index: 0, name: c0, type: long} - {index: 1, name: c1, type: long} in_reports: - {} - null - {} out_reports: - {} - {} - null - null - {} - {}
Code language: JavaScript (javascript)

Invalid な CSV を valid な CSV に書き換えて読み込み直します。

$ echo "3,4" > in/2_invalid.csv $ docker run --rm -v $(pwd)/config.yml:/tmp/config.yml -v $(pwd)/in:/opt/embulk/in -v $(pwd)/out:/opt/embulk/out -t tkuchiki/embulk run /tmp/config.yml --resume-state /opt/embulk/out/resume-state.yml 2018-08-09 09:16:15.729 +0000: Embulk v0.9.7 2018-08-09 09:16:17.284 +0000 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected. 2018-08-09 09:16:21.046 +0000 [INFO] (main): Gem's home and path are set by default: "/root/.embulk/lib/gems" 2018-08-09 09:16:22.230 +0000 [INFO] (main): Started Embulk v0.9.7 2018-08-09 09:16:22.345 +0000 [INFO] (0001:resume): Using local thread executor with max_threads=8 / output tasks 6 = input tasks 3 * 2 2018-08-09 09:16:22.351 +0000 [WARN] (0001:resume): Skipped resumed input task 0 2018-08-09 09:16:22.352 +0000 [WARN] (0001:resume): Skipped resumed input task 2 2018-08-09 09:16:22.352 +0000 [INFO] (0001:resume): {done: 2 / 3, running: 0} 3,4 2018-08-09 09:16:22.438 +0000 [INFO] (0001:resume): {done: 2 / 3, running: 1} 2018-08-09 09:16:22.458 +0000 [INFO] (main): Committed. 2018-08-09 09:16:22.459 +0000 [INFO] (main): Next config diff: {"in":{"last_path":"/opt/embulk/in/3_valid.csv"},"out":{}}
Code language: PHP (php)

成功しました。先程成功したファイルは表示されていないことがわかります。
成功すると resume-state ファイルは削除されます。

config-diff

config.yml は resume-state と同じものを使います。また、 in / out ディレクトリ以下のファイルはすべて削除します。
準備ができたら CSV ファイルを一つ作って embulk を実行します。

$ echo "1,2" > in/1.csv $ docker run --rm -v $(pwd)/config.yml:/tmp/config.yml -v $(pwd)/in:/opt/embulk/in -v $(pwd)/out:/opt/embulk/out -t tkuchiki/embulk run /tmp/config.yml -c /opt/embulk/out/diff.yml 2018-08-09 09:33:14.704 +0000: Embulk v0.9.7 2018-08-09 09:33:16.178 +0000 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected. 2018-08-09 09:33:20.000 +0000 [INFO] (main): Gem's home and path are set by default: "/root/.embulk/lib/gems" 2018-08-09 09:33:21.471 +0000 [INFO] (main): Started Embulk v0.9.7 2018-08-09 09:33:21.567 +0000 [INFO] (0001:transaction): Listing local files at directory '/opt/embulk/in' filtering filename by prefix '' 2018-08-09 09:33:21.571 +0000 [INFO] (0001:transaction): "follow_symlinks" is set false. Note that symbolic links to directories are skipped. 2018-08-09 09:33:21.591 +0000 [INFO] (0001:transaction): Loading files [/opt/embulk/in/1.csv] 2018-08-09 09:33:21.647 +0000 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4 2018-08-09 09:33:21.668 +0000 [INFO] (0001:transaction): {done: 0 / 1, running: 0} 1,2 2018-08-09 09:33:21.790 +0000 [INFO] (0001:transaction): {done: 1 / 1, running: 0} 2018-08-09 09:33:21.803 +0000 [INFO] (main): Committed. 2018-08-09 09:33:21.803 +0000 [INFO] (main): Next config diff: {"in":{"last_path":"/opt/embulk/in/1.csv"},"out":{}}
Code language: PHP (php)

diff ファイルを確認すると、読み込んだファイルが記録されています。

$ cat out/diff.yml in: {last_path: /opt/embulk/in/1.csv} out: {} CSV ファイルを一つ追加して
Code language: JavaScript (javascript)

console
$ echo “3,4” > in/2.csv

$ docker run –rm -v $(pwd)/config.yml:/tmp/config.yml -v $(pwd)/in:/opt/embulk/in -v $(pwd)/out:/opt/embulk/out -t tkuchiki/embulk run /tmp/config.yml -c /opt/embulk/out/diff.yml
2018-08-09 09:35:33.941 +0000: Embulk v0.9.7
2018-08-09 09:35:35.573 +0000 WARN: DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
2018-08-09 09:35:39.302 +0000 INFO: Gem’s home and path are set by default: “/root/.embulk/lib/gems”
2018-08-09 09:35:40.560 +0000 INFO: Started Embulk v0.9.7
2018-08-09 09:35:40.625 +0000 INFO: Listing local files at directory ‘/opt/embulk/in’ filtering filename by prefix ”
2018-08-09 09:35:40.627 +0000 INFO: “follow_symlinks” is set false. Note that symbolic links to directories are skipped.
2018-08-09 09:35:40.644 +0000 INFO: Loading files [/opt/embulk/in/2.csv]
2018-08-09 09:35:40.694 +0000 INFO: Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4
2018-08-09 09:35:40.704 +0000 INFO: {done: 0 / 1, running: 0}
3,4
2018-08-09 09:35:40.827 +0000 INFO: {done: 1 / 1, running: 0}
2018-08-09 09:35:40.833 +0000 INFO: Committed.
2018-08-09 09:35:40.833 +0000 INFO: Next config diff: {“in”:{“last_path”:”/opt/embulk/in/2.csv”},”out”:{}}

1回目の実行時に読み込んだ in/1.csv は処理されず、新しく追加された in/2.csv だけ処理されていることがわかります。 diff ファイルが更新されています。

console
$ cat out/diff.yml
in: {last_path: /opt/embulk/in/2.csv}
out: {}
“`

結果は省略しますが、この状態で再度 embulk を実行すると何も処理されずに終了します。
resume-state と config-diff は plugin が対応していないと使用できないので注意してください。
embulk-output-bigquery は resume-state に非対応、config-diff は diff を出力しますが、ファイルを追加しても何も実行されませんでした。

Leave a Reply