こんにちは。
今回は、Google Mapsで日本の公共交通機関を使った場合の所要時間を、プログラムから取得する方法を紹介します。
結論から言うと、正規のAPIでは現時点で日本の公共交通機関のルート検索ができないため、ブラウザ操作を自動化するスクレイピング手法で取得しています。
前提:なぜこのアプローチが必要なのか
通常であれば、Google Maps Platform の Directions API や Routes API を使って経路情報を取得するのが正攻法です。
しかし、日本の公共交通機関(電車・バスなど)については、これらのAPIが対応していません。APIに mode=transit を指定してリクエストしても、日本国内のルートではレスポンスが返ってこないのです。
これはGoogle Maps Platformの既知の制限で、日本の交通データがAPIとして公開されていないことが原因です。
一方、ブラウザ版のGoogle Mapsでは日本の公共交通機関のルート検索が問題なく動作します。この差を利用して、Playwrightでブラウザを自動操作し、Google Mapsの検索結果をスクレイピングするというのが今回のアプローチです。
⚠️ このアプローチは非推奨です
この方法はGoogle Mapsの利用規約に抵触する可能性があります。また、GoogleがUIを変更した場合にセレクタが壊れて動作しなくなるリスクがあります。あくまで正規APIが使えない場合の暫定的な手段として参考にしてください。
アプローチの概要
今回のスクリプトは以下のような流れで動作します。
- CSVファイルから出発地・目的地の一覧を読み込む
- Google Mapsの経路検索URLを組み立ててブラウザでアクセスする
- 公共交通機関モードで到着時刻・日付を設定する
- 検索結果から所要時間と時刻範囲をスクレイピングする
- 結果をCSVファイルに書き出す
大量のルート検索を一括で処理するため、非同期処理(asyncio)とセマフォによる同時実行制限を組み合わせています。また、途中で処理が中断した場合に再開できるレジューム機能も備えています。
実際のコード
import asyncio
import csv
import os
import datetime
import urllib.parse
from playwright.async_api import async_playwright
class Config:
"""設定クラス"""
MAX_CONCURRENT_TASKS = 1 # 同時実行制限
FROM_CSV = "data/from.csv"
TO_CSV = "data/to.csv"
OUTPUT_CSV = "output.csv"
TARGET_DATE = "2026/03/04"
TARGET_TIME = "08:20"
HEADLESS = True # ブラウザの表示設定
TIMEOUT_MS = 60000 # ページ遷移等のタイムアウト(ms)
LOCALE = "ja-JP"
async def search_route(context, from_addr, to_addr):
"""
指定された条件でルート検索を行い、結果(所要時間、時刻範囲)を返す非同期関数
"""
page = await context.new_page()
try:
print(f"--- ルート検索開始: {from_addr} -> {to_addr} ---")
encoded_from = urllib.parse.quote(from_addr)
encoded_to = urllib.parse.quote(to_addr)
# data=!4m2!4m1!3e3 は「公共交通機関」を指定するパラメータ
url = f"https://www.google.co.jp/maps/dir/{encoded_from}/{encoded_to}/data=!4m2!4m1!3e3"
# 1. ナビゲーション
await page.goto(url, timeout=Config.TIMEOUT_MS)
try:
await page.wait_for_load_state("domcontentloaded")
except:
pass
# 2. 移動手段を「公共交通機関」に設定 (URLで指定済みのため、確認・補完用)
# 念のためボタンがあればクリックするが、見つからなくてもエラーにしない
try:
transit_mode_btn = page.locator("div[aria-label='公共交通機関'], button[aria-label='公共交通機関']").first
if await transit_mode_btn.is_visible():
await transit_mode_btn.click()
else:
# アイコン検索 (失敗しても続行)
icon_btn = page.locator("img[src*='transit']").first.locator("..")
if await icon_btn.is_visible():
await icon_btn.click()
except:
pass
# 3. 到着時刻・日付の設定
# ページ読み込み後の安定化待ち
await asyncio.sleep(3)
# "出発時刻" / "到着時刻" のドロップダウンを開く
# クラス指定(e2moi)を優先し、その後にテキスト検索を行う
# 複数のセレクタ候補を作成
time_btn_candidates = [
page.locator("button.e2moi"),
page.locator("span:has-text('出発時刻')"),
page.locator("span:has-text('到着時刻')"),
page.locator("span:has-text('すぐに出発')"),
page.locator("button[aria-label*='時刻']"),
page.locator("button[aria-label*='time']")
]
time_menu_btn = None
for cand in time_btn_candidates:
if await cand.count() > 0:
# 最初に見つかった候補を使用 (可視性待機)
try:
# タイムアウトを短めにして次へ
cand_first = cand.first
if await cand_first.is_visible():
time_menu_btn = cand_first
break
except:
continue
if time_menu_btn:
# 安定性のためのリトライループ
max_retries = 3
for attempt in range(max_retries):
print(f" Debug: Time setting attempt {attempt+1}/{max_retries}")
# ボタンのテキスト取得 (JSのプロパティ修正)
btn_text = await time_menu_btn.inner_text()
print(f" Debug: Current Button Text: {btn_text}")
# すでに設定済みならスキップ (到着時刻が含まれていればOK)
# "到着" や "Arrive" が含まれているか、または設定した時刻が含まれているか
# ただし、初回は必ず設定するフローを通す
# メニューが開いていない場合は開く
# "aria-expanded" 属性があれば確認できるが、単純にクリックしてオプションを探す
await time_menu_btn.click(force=True)
await asyncio.sleep(1)
# "到着時刻" を選択
arrive_by_option = page.locator("div[role='menuitemradio']").filter(has_text="到着時刻").first
if await arrive_by_option.is_visible():
print(" Debug: Clicking 'Arrive by' option")
await arrive_by_option.click()
# 日付の設定
dt = datetime.datetime.strptime(Config.TARGET_DATE, "%Y/%m/%d")
target_label = f"{dt.day} {dt.month}月"
date_picker_btn = page.locator("button[jsaction*='openDatePicker']").first
if await date_picker_btn.is_visible():
await date_picker_btn.click()
await asyncio.sleep(0.5)
found_date = False
dialog = page.locator("div[role='dialog'], div.goog-date-picker").first
for _ in range(12):
day_cell = page.locator(f"td[role='gridcell'][aria-label*='{target_label}']").first
if await day_cell.count() > 0 and await day_cell.is_visible():
await day_cell.click()
found_date = True
print(f" Debug: Date set to {Config.TARGET_DATE}")
break
else:
# 次の月へ
next_btn = None
for sel in ["button:has-text('»')", "[aria-label='次の月']", "[aria-label='Next month']", "button:has-text('>')", "span:has-text('navigate_next')"]:
c = dialog.locator(sel).first
if await c.is_visible():
next_btn = c
break
if next_btn:
await next_btn.click()
await asyncio.sleep(0.5)
else:
break
# 時刻の設定
time_input = page.locator("input[name='transit-time']").first
if not await time_input.is_visible():
time_input = page.locator("input[aria-label*='時間']").first
if await time_input.is_visible():
await time_input.click()
await time_input.fill(Config.TARGET_TIME)
await time_input.press("Enter")
await time_input.press("Tab")
print(f" Debug: Time filed filled with {Config.TARGET_TIME}")
await asyncio.sleep(2)
# 確認
updated_text = await time_menu_btn.inner_text()
print(f" Debug: Button Text after set: {updated_text}")
if "到着" in updated_text or "Arrive" in updated_text or Config.TARGET_TIME in updated_text:
print(" Debug: Time setting verified success.")
break
else:
print(" ! Debug: Time setting verification failed. Retrying...")
await asyncio.sleep(1)
else:
print(" ! 時刻設定メニューボタンが見つかりません (スクリーンショットを保存します: error_time_menu.png)")
await page.screenshot(path="error_time_menu.png")
# 最終確認ログ
print(f" Debug: Current URL: {page.url}")
if attempt == max_retries - 1:
await page.screenshot(path="debug_after_setting_final.png")
# 設定が反映されるのを待つ
await asyncio.sleep(2)
# 4. 結果取得
result_selector = 'div[data-trip-index="0"]'
try:
await page.wait_for_selector(result_selector, timeout=8000)
except:
print(f" ! ルートが見つかりませんでした: {from_addr} -> {to_addr}")
return "ルートなし", "ルートなし", 0, "ルートなし"
trip_element = page.locator(result_selector).first
# 所要時間
minutes = 0
try:
duration = await trip_element.locator(".fontHeadlineSmall").first.inner_text()
# "1 時間 35 分" -> 分単位に変換
import re
hours = 0
mins = 0
h_match = re.search(r'(\d+)\s*時間', duration)
if h_match:
hours = int(h_match.group(1))
m_match = re.search(r'(\d+)\s*分', duration)
if m_match:
mins = int(m_match.group(1))
minutes = hours * 60 + mins
except:
duration = "不明"
# 時刻範囲
time_range = "不明"
try:
header = trip_element.locator("h1.fontHeadlineSmall").first
if await header.count() > 0:
time_range = await header.inner_text()
else:
inner = await trip_element.inner_text()
lines = inner.split('\n')
if len(lines) > 1:
time_range = lines[1]
except:
pass
print(f" Result: {from_addr} -> {to_addr} : {duration} ({minutes} min)")
return "成功", duration, minutes, time_range
except Exception as e:
print(f" ! Error ({from_addr} -> {to_addr}): {e}")
return "エラー", str(e), 0, ""
finally:
await page.close()
async def worker(sem, context, row_data, output_lock):
async with sem:
from_name = row_data['from_name']
from_addr = row_data['from_addr']
to_name = row_data['to_name']
to_addr = row_data['to_addr']
# search_routeはConfigを使うようになったので引数を減らすことも可能だが、
# アドレス情報は必須。TargetDateなどはConfig参照で統一。
status, duration, minutes, time_range = await search_route(context, from_addr, to_addr)
async with output_lock:
with open(Config.OUTPUT_CSV, 'a', encoding='utf-8', newline='') as f:
writer = csv.writer(f)
writer.writerow([from_name, from_addr, to_name, to_addr, status, duration, minutes, time_range, Config.TARGET_DATE, Config.TARGET_TIME])
async def main():
# データ読み込み
from_list = []
if os.path.exists(Config.FROM_CSV):
with open(Config.FROM_CSV, encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
from_list.append(row)
to_list = []
if os.path.exists(Config.TO_CSV):
with open(Config.TO_CSV, encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
to_list.append(row)
print(f"From: {len(from_list)}件, To: {len(to_list)}件")
total_combinations = len(from_list) * len(to_list)
print(f"Total combinations: {total_combinations}")
print(f"Concurrency Limit: {Config.MAX_CONCURRENT_TASKS}")
# 結果ファイル準備 & 既存データの読み込み (Resume機能 - シンプル版)
last_processed_pair = None
file_exists = os.path.isfile(Config.OUTPUT_CSV)
if file_exists:
try:
with open(Config.OUTPUT_CSV, 'r', encoding='utf-8') as f:
# 最後の空行ではない行を取得するために、末尾から少し読むか、全行読む
# ファイルサイズがそれほど大きくなければ全行読み込みで十分高速
lines = f.readlines()
for line in reversed(lines):
line = line.strip()
if line:
# CSVパース (簡易的)
# カンマ区切りだが、値にカンマが含まれる場合は注意が必要。
# ここでは簡易的に csv モジュールを使ってパースする
import io
reader = csv.reader(io.StringIO(line))
row = next(reader)
if row and len(row) >= 4 and row[1] != "from_addr": # ヘッダー除外
last_processed_pair = (row[1], row[3])
print(f"Resuming from after: {last_processed_pair}")
break
except Exception as e:
print(f"Warning: Failed to read last line of output file: {e}")
with open(Config.OUTPUT_CSV, 'a', encoding='utf-8', newline='') as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(["from_name", "from_addr", "to_name", "to_addr", "status", "duration", "minutes", "time_range", "date", "arrive_by"])
async with async_playwright() as p:
print("Launching browser...")
# ブラウザ起動
browser = await p.chromium.launch(headless=Config.HEADLESS)
context = await browser.new_context(locale=Config.LOCALE)
sem = asyncio.Semaphore(Config.MAX_CONCURRENT_TASKS)
lock = asyncio.Lock()
tasks = []
count = 0
skipped_count = 0
resume_mode = (last_processed_pair is not None)
for f_item in from_list:
for t_item in to_list:
current_pair = (f_item['address'], t_item['address'])
if resume_mode:
if current_pair == last_processed_pair:
resume_mode = False
skipped_count += 1
print("Found last processed item. Resuming process from next item.")
else:
skipped_count += 1
# ループの最後で continue すると、このループの他の処理(count+=1とか)もスキップされる
# ここではタスク追加をスキップするだけにする
continue
count += 1
row_data = {
'from_name': f_item['name'],
'from_addr': f_item['address'],
'to_name': t_item['name'],
'to_addr': t_item['address']
}
# タスク作成
tasks.append(worker(sem, context, row_data, lock))
print(f"Skipped {skipped_count} tasks (already processed).")
print(f"Starting {len(tasks)} tasks processing...")
await asyncio.gather(*tasks)
print("\nBatch process finished.")
if __name__ == "__main__":
asyncio.run(main())
コードの解説
Configクラス
スクリプト全体の設定を一箇所にまとめています。
MAX_CONCURRENT_TASKS = 1:同時実行数を1に制限しています。Google Mapsへの過剰なリクエストを避けるためです。値を増やすと処理速度は上がりますが、ブロックされるリスクも高まります。FROM_CSV/TO_CSV:出発地と目的地のリストを記載したCSVファイルのパスです。CSVにはnameとaddressのカラムが必要です。TARGET_DATE/TARGET_TIME:到着したい日付と時刻です。ルート検索は「この時刻に到着する」という条件で行われます。HEADLESS = True:ブラウザを非表示で動作させます。デバッグ時はFalseにすると実際のブラウザ操作が確認できます。
search_route関数
メインのスクレイピングロジックです。1つのルート検索を担当します。
URLの組み立て
https://www.google.co.jp/maps/dir/{出発地}/{目的地}/data=!4m2!4m1!3e3
末尾の data=!4m2!4m1!3e3 が公共交通機関モードを指定するパラメータです。これにより、ページを開いた時点で公共交通機関での検索結果が表示されます。
到着時刻の設定
Google Mapsのデフォルトは「すぐに出発」のため、到着時刻を明示的に設定する必要があります。以下のステップで行っています。
- 時刻設定のドロップダウンボタンを複数のセレクタ候補から検索して特定
- ドロップダウンを開いて「到着時刻」オプションを選択
- カレンダーから日付を選択(必要に応じて月を進める)
- 時刻入力フィールドに目標時刻を入力
- 設定が反映されたかボタンのテキストで検証(最大3回リトライ)
この部分が最も壊れやすい箇所です。Google MapsのUIは頻繁に変更されるため、セレクタが合わなくなることがあります。
結果の取得
検索結果は div[data-trip-index="0"](最初のルート候補)から取得しています。所要時間は「1 時間 35 分」のようなテキストから正規表現で時間と分を抽出し、分単位に変換しています。
worker関数
個々のルート検索をラップする非同期ワーカーです。セマフォ(sem)で同時実行数を制限し、出力ファイルへの書き込みをロック(output_lock)で排他制御しています。
main関数
全体のオーケストレーションを担当します。
レジューム機能
出力CSVファイルが既に存在する場合、最後に処理された出発地・目的地のペアを読み取り、そこから処理を再開します。大量のルート検索を行う場合、途中でエラーが発生しても最初からやり直す必要がありません。
全組み合わせの生成
出発地リスト × 目的地リストの全組み合わせに対して検索を実行します。例えば出発地が10件、目的地が20件であれば、200件のルート検索が行われます。
入力CSVの形式
data/from.csv と data/to.csv は以下のような形式で用意します。
name,address
東京駅,東京都千代田区丸の内1丁目
新宿駅,東京都新宿区新宿3丁目
name は結果CSVでの識別用ラベル、address はGoogle Mapsで検索可能な住所文字列です。
出力結果
output.csv には以下のカラムが出力されます。
| カラム | 説明 |
|---|---|
| from_name | 出発地の名前 |
| from_addr | 出発地の住所 |
| to_name | 目的地の名前 |
| to_addr | 目的地の住所 |
| status | 成功 / ルートなし / エラー |
| duration | 所要時間(テキスト) |
| minutes | 所要時間(分単位) |
| time_range | 出発〜到着の時刻範囲 |
| date | 検索対象日付 |
| arrive_by | 到着希望時刻 |
まとめ
Google Maps Directions API / Routes APIが日本の公共交通機関に対応していないため、ブラウザ自動操作によるスクレイピングで所要時間を取得する方法を紹介しました。
正規のAPIが使えれば圧倒的にシンプルで安定した実装が可能ですが、現時点では選択肢がないため、このようなアプローチを取っています。
注意点として、以下を改めてお伝えします。
- 利用規約のリスク:Google Mapsのスクレイピングは利用規約に抵触する可能性があります
- UI変更による破損リスク:セレクタはGoogleのUI変更で容易に壊れます
- レート制限:大量リクエストを送るとブロックされる可能性があります
日本の公共交通機関のデータがAPIとして正式に提供されることを期待しつつ、それまでの暫定的な手段として参考にしていただければ幸いです。
それではまた!