[Google Maps] Google Mapsで日本の公共交通機関での所要時間を取得する方法(非推奨)

2026/02/11

Google Maps
Python
Playwright

こんにちは。

今回は、Google Mapsで日本の公共交通機関を使った場合の所要時間を、プログラムから取得する方法を紹介します。

結論から言うと、正規のAPIでは現時点で日本の公共交通機関のルート検索ができないため、ブラウザ操作を自動化するスクレイピング手法で取得しています

前提:なぜこのアプローチが必要なのか

通常であれば、Google Maps Platform の Directions APIRoutes API を使って経路情報を取得するのが正攻法です。

しかし、日本の公共交通機関(電車・バスなど)については、これらのAPIが対応していません。APIに mode=transit を指定してリクエストしても、日本国内のルートではレスポンスが返ってこないのです。

これはGoogle Maps Platformの既知の制限で、日本の交通データがAPIとして公開されていないことが原因です。

一方、ブラウザ版のGoogle Mapsでは日本の公共交通機関のルート検索が問題なく動作します。この差を利用して、Playwrightでブラウザを自動操作し、Google Mapsの検索結果をスクレイピングするというのが今回のアプローチです。

⚠️ このアプローチは非推奨です

この方法はGoogle Mapsの利用規約に抵触する可能性があります。また、GoogleがUIを変更した場合にセレクタが壊れて動作しなくなるリスクがあります。あくまで正規APIが使えない場合の暫定的な手段として参考にしてください。

アプローチの概要

今回のスクリプトは以下のような流れで動作します。

  1. CSVファイルから出発地・目的地の一覧を読み込む
  2. Google Mapsの経路検索URLを組み立ててブラウザでアクセスする
  3. 公共交通機関モードで到着時刻・日付を設定する
  4. 検索結果から所要時間と時刻範囲をスクレイピングする
  5. 結果をCSVファイルに書き出す

大量のルート検索を一括で処理するため、非同期処理(asyncio)とセマフォによる同時実行制限を組み合わせています。また、途中で処理が中断した場合に再開できるレジューム機能も備えています。

実際のコード

import asyncio
import csv
import os
import datetime
import urllib.parse
from playwright.async_api import async_playwright

class Config:
    """設定クラス"""
    MAX_CONCURRENT_TASKS = 1  # 同時実行制限
    FROM_CSV = "data/from.csv"
    TO_CSV = "data/to.csv"
    OUTPUT_CSV = "output.csv"
    TARGET_DATE = "2026/03/04"
    TARGET_TIME = "08:20"
    HEADLESS = True     # ブラウザの表示設定
    TIMEOUT_MS = 60000   # ページ遷移等のタイムアウト(ms)
    LOCALE = "ja-JP"

async def search_route(context, from_addr, to_addr):
    """
    指定された条件でルート検索を行い、結果(所要時間、時刻範囲)を返す非同期関数
    """
    page = await context.new_page()
    try:
        print(f"--- ルート検索開始: {from_addr} -> {to_addr} ---")
        
        encoded_from = urllib.parse.quote(from_addr)
        encoded_to = urllib.parse.quote(to_addr)
        # data=!4m2!4m1!3e3 は「公共交通機関」を指定するパラメータ
        url = f"https://www.google.co.jp/maps/dir/{encoded_from}/{encoded_to}/data=!4m2!4m1!3e3"
        
        # 1. ナビゲーション
        await page.goto(url, timeout=Config.TIMEOUT_MS)
        try:
            await page.wait_for_load_state("domcontentloaded")
        except:
             pass 
        
        # 2. 移動手段を「公共交通機関」に設定 (URLで指定済みのため、確認・補完用)
        # 念のためボタンがあればクリックするが、見つからなくてもエラーにしない
        try:
            transit_mode_btn = page.locator("div[aria-label='公共交通機関'], button[aria-label='公共交通機関']").first
            if await transit_mode_btn.is_visible():
                await transit_mode_btn.click()
            else:
                 # アイコン検索 (失敗しても続行)
                 icon_btn = page.locator("img[src*='transit']").first.locator("..")
                 if await icon_btn.is_visible():
                     await icon_btn.click()
        except:
            pass

        # 3. 到着時刻・日付の設定
        # ページ読み込み後の安定化待ち
        await asyncio.sleep(3)

        # "出発時刻" / "到着時刻" のドロップダウンを開く
        # クラス指定(e2moi)を優先し、その後にテキスト検索を行う
        # 複数のセレクタ候補を作成
        time_btn_candidates = [
             page.locator("button.e2moi"),
             page.locator("span:has-text('出発時刻')"), 
             page.locator("span:has-text('到着時刻')"), 
             page.locator("span:has-text('すぐに出発')"),
             page.locator("button[aria-label*='時刻']"),
             page.locator("button[aria-label*='time']")
        ]
        
        time_menu_btn = None
        for cand in time_btn_candidates:
             if await cand.count() > 0:
                 # 最初に見つかった候補を使用 (可視性待機)
                 try:
                     # タイムアウトを短めにして次へ
                     cand_first = cand.first
                     if await cand_first.is_visible():
                        time_menu_btn = cand_first
                        break
                 except:
                     continue
        
        if time_menu_btn:
            # 安定性のためのリトライループ
            max_retries = 3
            for attempt in range(max_retries):
                print(f"  Debug: Time setting attempt {attempt+1}/{max_retries}")
                
                # ボタンのテキスト取得 (JSのプロパティ修正)
                btn_text = await time_menu_btn.inner_text()
                print(f"  Debug: Current Button Text: {btn_text}")

                # すでに設定済みならスキップ (到着時刻が含まれていればOK)
                # "到着" や "Arrive" が含まれているか、または設定した時刻が含まれているか
                # ただし、初回は必ず設定するフローを通す
                
                # メニューが開いていない場合は開く
                # "aria-expanded" 属性があれば確認できるが、単純にクリックしてオプションを探す
                await time_menu_btn.click(force=True)
                await asyncio.sleep(1)

                # "到着時刻" を選択
                arrive_by_option = page.locator("div[role='menuitemradio']").filter(has_text="到着時刻").first
                if await arrive_by_option.is_visible():
                    print("  Debug: Clicking 'Arrive by' option")
                    await arrive_by_option.click()
                
                # 日付の設定
                dt = datetime.datetime.strptime(Config.TARGET_DATE, "%Y/%m/%d")
                target_label = f"{dt.day} {dt.month}月"
                
                date_picker_btn = page.locator("button[jsaction*='openDatePicker']").first
                if await date_picker_btn.is_visible():
                    await date_picker_btn.click()
                    await asyncio.sleep(0.5)
                    
                    found_date = False
                    dialog = page.locator("div[role='dialog'], div.goog-date-picker").first
                    for _ in range(12):
                        day_cell = page.locator(f"td[role='gridcell'][aria-label*='{target_label}']").first
                        if await day_cell.count() > 0 and await day_cell.is_visible():
                            await day_cell.click()
                            found_date = True
                            print(f"  Debug: Date set to {Config.TARGET_DATE}")
                            break
                        else:
                            # 次の月へ
                            next_btn = None
                            for sel in ["button:has-text('»')", "[aria-label='次の月']", "[aria-label='Next month']", "button:has-text('>')", "span:has-text('navigate_next')"]:
                                c = dialog.locator(sel).first
                                if await c.is_visible():
                                    next_btn = c
                                    break
                            
                            if next_btn:
                                await next_btn.click()
                                await asyncio.sleep(0.5)
                            else:
                                break

                # 時刻の設定
                time_input = page.locator("input[name='transit-time']").first
                if not await time_input.is_visible():
                    time_input = page.locator("input[aria-label*='時間']").first
                
                if await time_input.is_visible():
                    await time_input.click()
                    await time_input.fill(Config.TARGET_TIME)
                    await time_input.press("Enter")
                    await time_input.press("Tab")
                    print(f"  Debug: Time filed filled with {Config.TARGET_TIME}")
                
                await asyncio.sleep(2)
                
                # 確認
                updated_text = await time_menu_btn.inner_text()
                print(f"  Debug: Button Text after set: {updated_text}")
                
                if "到着" in updated_text or "Arrive" in updated_text or Config.TARGET_TIME in updated_text:
                    print("  Debug: Time setting verified success.")
                    break
                else:
                    print("  ! Debug: Time setting verification failed. Retrying...")
                    await asyncio.sleep(1)

        else:
            print("  ! 時刻設定メニューボタンが見つかりません (スクリーンショットを保存します: error_time_menu.png)")
            await page.screenshot(path="error_time_menu.png")
            
        # 最終確認ログ
        print(f"  Debug: Current URL: {page.url}")
        if attempt == max_retries - 1:
             await page.screenshot(path="debug_after_setting_final.png")

        # 設定が反映されるのを待つ
        await asyncio.sleep(2)
        
        # 4. 結果取得
        result_selector = 'div[data-trip-index="0"]'
        try:
            await page.wait_for_selector(result_selector, timeout=8000)
        except:
            print(f"  ! ルートが見つかりませんでした: {from_addr} -> {to_addr}")
            return "ルートなし", "ルートなし", 0, "ルートなし"

        trip_element = page.locator(result_selector).first
        
        # 所要時間
        minutes = 0
        try:
            duration = await trip_element.locator(".fontHeadlineSmall").first.inner_text()
            # "1 時間 35 分" -> 分単位に変換
            import re
            hours = 0
            mins = 0
            
            h_match = re.search(r'(\d+)\s*時間', duration)
            if h_match:
                hours = int(h_match.group(1))
            
            m_match = re.search(r'(\d+)\s*', duration)
            if m_match:
                mins = int(m_match.group(1))
            
            minutes = hours * 60 + mins
        except:
            duration = "不明"

        # 時刻範囲
        time_range = "不明"
        try:
            header = trip_element.locator("h1.fontHeadlineSmall").first
            if await header.count() > 0:
                time_range = await header.inner_text()
            else:
                 inner = await trip_element.inner_text()
                 lines = inner.split('\n')
                 if len(lines) > 1:
                     time_range = lines[1]
        except:
            pass
            
        print(f"  Result: {from_addr} -> {to_addr} : {duration} ({minutes} min)")
        return "成功", duration, minutes, time_range

    except Exception as e:
        print(f"  ! Error ({from_addr} -> {to_addr}): {e}")
        return "エラー", str(e), 0, ""
    finally:
        await page.close()

async def worker(sem, context, row_data, output_lock):
    async with sem:
        from_name = row_data['from_name']
        from_addr = row_data['from_addr']
        to_name = row_data['to_name']
        to_addr = row_data['to_addr']
        
        # search_routeはConfigを使うようになったので引数を減らすことも可能だが、
        # アドレス情報は必須。TargetDateなどはConfig参照で統一。
        status, duration, minutes, time_range = await search_route(context, from_addr, to_addr)
        
        async with output_lock:
             with open(Config.OUTPUT_CSV, 'a', encoding='utf-8', newline='') as f:
                writer = csv.writer(f)
                writer.writerow([from_name, from_addr, to_name, to_addr, status, duration, minutes, time_range, Config.TARGET_DATE, Config.TARGET_TIME])

async def main():
    # データ読み込み
    from_list = []
    if os.path.exists(Config.FROM_CSV):
        with open(Config.FROM_CSV, encoding='utf-8') as f:
            reader = csv.DictReader(f)
            for row in reader:
                from_list.append(row)
    
    to_list = []
    if os.path.exists(Config.TO_CSV):
        with open(Config.TO_CSV, encoding='utf-8') as f:
            reader = csv.DictReader(f)
            for row in reader:
                to_list.append(row)
    
    print(f"From: {len(from_list)}件, To: {len(to_list)}件")
    total_combinations = len(from_list) * len(to_list)
    print(f"Total combinations: {total_combinations}")
    print(f"Concurrency Limit: {Config.MAX_CONCURRENT_TASKS}")

    # 結果ファイル準備 & 既存データの読み込み (Resume機能 - シンプル版)
    last_processed_pair = None
    file_exists = os.path.isfile(Config.OUTPUT_CSV)
    
    if file_exists:
        try:
            with open(Config.OUTPUT_CSV, 'r', encoding='utf-8') as f:
                # 最後の空行ではない行を取得するために、末尾から少し読むか、全行読む
                # ファイルサイズがそれほど大きくなければ全行読み込みで十分高速
                lines = f.readlines()
                for line in reversed(lines):
                    line = line.strip()
                    if line:
                        # CSVパース (簡易的)
                        # カンマ区切りだが、値にカンマが含まれる場合は注意が必要。
                        # ここでは簡易的に csv モジュールを使ってパースする
                        import io
                        reader = csv.reader(io.StringIO(line))
                        row = next(reader)
                        if row and len(row) >= 4 and row[1] != "from_addr": # ヘッダー除外
                             last_processed_pair = (row[1], row[3])
                             print(f"Resuming from after: {last_processed_pair}")
                             break
        except Exception as e:
            print(f"Warning: Failed to read last line of output file: {e}")

    with open(Config.OUTPUT_CSV, 'a', encoding='utf-8', newline='') as f:
        writer = csv.writer(f)
        if not file_exists:
            writer.writerow(["from_name", "from_addr", "to_name", "to_addr", "status", "duration", "minutes", "time_range", "date", "arrive_by"])

    async with async_playwright() as p:
        print("Launching browser...")
        # ブラウザ起動
        browser = await p.chromium.launch(headless=Config.HEADLESS)
        context = await browser.new_context(locale=Config.LOCALE)

        sem = asyncio.Semaphore(Config.MAX_CONCURRENT_TASKS)
        lock = asyncio.Lock()
        tasks = []

        count = 0
        skipped_count = 0
        resume_mode = (last_processed_pair is not None)
        
        for f_item in from_list:
            for t_item in to_list:
                current_pair = (f_item['address'], t_item['address'])
                
                if resume_mode:
                    if current_pair == last_processed_pair:
                        resume_mode = False
                        skipped_count += 1
                        print("Found last processed item. Resuming process from next item.")
                    else:
                        skipped_count += 1
                    
                    # ループの最後で continue すると、このループの他の処理(count+=1とか)もスキップされる
                    # ここではタスク追加をスキップするだけにする
                    continue

                count += 1
                row_data = {
                    'from_name': f_item['name'],
                    'from_addr': f_item['address'],
                    'to_name': t_item['name'],
                    'to_addr': t_item['address']
                }
                # タスク作成
                tasks.append(worker(sem, context, row_data, lock))

        print(f"Skipped {skipped_count} tasks (already processed).")
        print(f"Starting {len(tasks)} tasks processing...")
        await asyncio.gather(*tasks)
        
        print("\nBatch process finished.")

if __name__ == "__main__":
    asyncio.run(main())

コードの解説

Configクラス

スクリプト全体の設定を一箇所にまとめています。

  • MAX_CONCURRENT_TASKS = 1:同時実行数を1に制限しています。Google Mapsへの過剰なリクエストを避けるためです。値を増やすと処理速度は上がりますが、ブロックされるリスクも高まります。
  • FROM_CSV / TO_CSV:出発地と目的地のリストを記載したCSVファイルのパスです。CSVには nameaddress のカラムが必要です。
  • TARGET_DATE / TARGET_TIME:到着したい日付と時刻です。ルート検索は「この時刻に到着する」という条件で行われます。
  • HEADLESS = True:ブラウザを非表示で動作させます。デバッグ時は False にすると実際のブラウザ操作が確認できます。

search_route関数

メインのスクレイピングロジックです。1つのルート検索を担当します。

URLの組み立て

https://www.google.co.jp/maps/dir/{出発地}/{目的地}/data=!4m2!4m1!3e3

末尾の data=!4m2!4m1!3e3 が公共交通機関モードを指定するパラメータです。これにより、ページを開いた時点で公共交通機関での検索結果が表示されます。

到着時刻の設定

Google Mapsのデフォルトは「すぐに出発」のため、到着時刻を明示的に設定する必要があります。以下のステップで行っています。

  1. 時刻設定のドロップダウンボタンを複数のセレクタ候補から検索して特定
  2. ドロップダウンを開いて「到着時刻」オプションを選択
  3. カレンダーから日付を選択(必要に応じて月を進める)
  4. 時刻入力フィールドに目標時刻を入力
  5. 設定が反映されたかボタンのテキストで検証(最大3回リトライ)

この部分が最も壊れやすい箇所です。Google MapsのUIは頻繁に変更されるため、セレクタが合わなくなることがあります。

結果の取得

検索結果は div[data-trip-index="0"](最初のルート候補)から取得しています。所要時間は「1 時間 35 分」のようなテキストから正規表現で時間と分を抽出し、分単位に変換しています。

worker関数

個々のルート検索をラップする非同期ワーカーです。セマフォ(sem)で同時実行数を制限し、出力ファイルへの書き込みをロック(output_lock)で排他制御しています。

main関数

全体のオーケストレーションを担当します。

レジューム機能

出力CSVファイルが既に存在する場合、最後に処理された出発地・目的地のペアを読み取り、そこから処理を再開します。大量のルート検索を行う場合、途中でエラーが発生しても最初からやり直す必要がありません。

全組み合わせの生成

出発地リスト × 目的地リストの全組み合わせに対して検索を実行します。例えば出発地が10件、目的地が20件であれば、200件のルート検索が行われます。

入力CSVの形式

data/from.csvdata/to.csv は以下のような形式で用意します。

name,address
東京駅,東京都千代田区丸の内1丁目
新宿駅,東京都新宿区新宿3丁目

name は結果CSVでの識別用ラベル、address はGoogle Mapsで検索可能な住所文字列です。

出力結果

output.csv には以下のカラムが出力されます。

カラム 説明
from_name 出発地の名前
from_addr 出発地の住所
to_name 目的地の名前
to_addr 目的地の住所
status 成功 / ルートなし / エラー
duration 所要時間(テキスト)
minutes 所要時間(分単位)
time_range 出発〜到着の時刻範囲
date 検索対象日付
arrive_by 到着希望時刻

まとめ

Google Maps Directions API / Routes APIが日本の公共交通機関に対応していないため、ブラウザ自動操作によるスクレイピングで所要時間を取得する方法を紹介しました。

正規のAPIが使えれば圧倒的にシンプルで安定した実装が可能ですが、現時点では選択肢がないため、このようなアプローチを取っています。

注意点として、以下を改めてお伝えします。

  • 利用規約のリスク:Google Mapsのスクレイピングは利用規約に抵触する可能性があります
  • UI変更による破損リスク:セレクタはGoogleのUI変更で容易に壊れます
  • レート制限:大量リクエストを送るとブロックされる可能性があります

日本の公共交通機関のデータがAPIとして正式に提供されることを期待しつつ、それまでの暫定的な手段として参考にしていただければ幸いです。

それではまた!