Skip to content

Цепи Маркова: подробное объяснение с примерами на Python

Что такое цепь Маркова?

Цепь Маркова - это математическая модель, описывающая последовательность событий, где вероятность каждого события зависит только от предыдущего состояния (свойство отсутствия памяти).

Основные понятия

1. Состояния

$ S = {s₁, s₂, ..., sₙ} $ - множество возможных состояний системы

2. Матрица переходных вероятностей

$ P = [pᵢⱼ] - матрица, где pᵢⱼ = P(Xₙ₊₁ = j | Xₙ = i) $

  • Сумма вероятностей в каждой строке = 1

3. Свойство Маркова

$ P(Xₙ₊₁ = j | Xₙ = i, Xₙ₋₁ = iₙ₋₁, ..., X₀ = i₀) = P(Xₙ₊₁ = j | Xₙ = i) $

Типы цепей Маркова

1. Однородные vs Неоднородные

  • Однородные: вероятности переходов не зависят от времени
  • Неоднородные: вероятности меняются со временем

2. Эргодические vs Поглощающие

  • Эргодические: из любого состояния можно достичь любого другого
  • Поглощающие: есть состояния, из которых нельзя выйти

Примеры реализации

Пример 1: Простая цепь Маркова для погоды

import numpy as np
import matplotlib.pyplot as plt

class MarkovChain:
    def __init__(self, transition_matrix, states):
        """
        transition_matrix: матрица переходных вероятностей
        states: список состояний
        """
        self.transition_matrix = np.array(transition_matrix)
        self.states = states
        self.state_index = {state: i for i, state in enumerate(states)}
        self.current_state = None

    def set_state(self, state):
        """Установить текущее состояние"""
        self.current_state = self.state_index[state]

    def next_state(self):
        """Перейти к следующему состоянию"""
        if self.current_state is None:
            raise ValueError("State not set")

        # Выбираем следующее состояние на основе вероятностей перехода
        next_state_idx = np.random.choice(
            len(self.states),
            p=self.transition_matrix[self.current_state]
        )
        self.current_state = next_state_idx
        return self.states[next_state_idx]

    def simulate(self, start_state, n_steps):
        """Симуляция цепи на n_steps шагов"""
        self.set_state(start_state)
        sequence = [start_state]

        for _ in range(n_steps):
            sequence.append(self.next_state())

        return sequence

    def stationary_distribution(self, max_iter=1000, tol=1e-6):
        """Нахождение стационарного распределения"""
        n = len(self.states)
        pi = np.ones(n) / n  # начальное распределение

        for _ in range(max_iter):
            pi_new = pi @ self.transition_matrix
            if np.linalg.norm(pi_new - pi) < tol:
                break
            pi = pi_new

        return {state: pi[i] for i, state in enumerate(self.states)}

# Пример: Модель погоды
# Состояния: Sunny, Cloudy, Rainy
weather_matrix = [
    [0.6, 0.3, 0.1],  # Sunny -> [Sunny, Cloudy, Rainy]
    [0.4, 0.4, 0.2],  # Cloudy -> [Sunny, Cloudy, Rainy]
    [0.2, 0.3, 0.5]   # Rainy -> [Sunny, Cloudy, Rainy]
]

weather_states = ['Sunny', 'Cloudy', 'Rainy']

# Создаем цепь Маркова
weather_chain = MarkovChain(weather_matrix, weather_states)

# Симуляция на 20 дней
weather_sequence = weather_chain.simulate('Sunny', 20)
print("Последовательность погоды:", weather_sequence)

# Стационарное распределение
stationary = weather_chain.stationary_distribution()
print("\nСтационарное распределение:")
for state, prob in stationary.items():
    print(f"{state}: {prob:.3f}")

Пример 2: Генератор текста на основе цепей Маркова

import re
from collections import defaultdict
import random

class TextMarkovGenerator:
    def __init__(self, order=2):
        self.order = order  # порядок цепи
        self.model = defaultdict(lambda: defaultdict(int))
        self.starts = []  # начальные состояния

    def train(self, text):
        """Обучение модели на тексте"""
        # Очистка текста и разбиение на слова
        words = re.findall(r'\b\w+\b', text.lower())

        if len(words) <= self.order:
            return

        # Создание модели
        for i in range(len(words) - self.order):
            # Текущее состояние (n слов)
            state = tuple(words[i:i + self.order])

            # Следующее слово
            next_word = words[i + self.order]

            # Запоминаем начальные состояния
            if i == 0:
                self.starts.append(state)

            # Обновляем счетчики переходов
            self.model[state][next_word] += 1

    def _normalize_probabilities(self):
        """Нормализация вероятностей"""
        for state in self.model:
            total = sum(self.model[state].values())
            for word in self.model[state]:
                self.model[state][word] /= total

    def generate_text(self, length=50):
        """Генерация текста"""
        if not self.model:
            return "Модель не обучена"

        self._normalize_probabilities()

        # Выбираем случайное начальное состояние
        current_state = random.choice(self.starts)
        result = list(current_state)

        for _ in range(length - self.order):
            if current_state not in self.model:
                break

            # Выбираем следующее слово на основе вероятностей
            next_words = list(self.model[current_state].keys())
            probabilities = list(self.model[current_state].values())

            next_word = random.choices(next_words, weights=probabilities)[0]
            result.append(next_word)

            # Обновляем текущее состояние
            current_state = tuple(result[-self.order:])

        return ' '.join(result)

# Пример использования
text = """
Цепи Маркова это математические модели которые используются во многих областях
науки и техники Они особенно полезны для анализа случайных процессов и
прогнозирования будущих состояний системы на основе текущего состояния
Модели Маркова применяются в лингвистике экономике физике и компьютерных науках
"""

generator = TextMarkovGenerator(order=2)
generator.train(text)

print("Сгенерированный текст:")
for i in range(3):
    print(f"{i+1}. {generator.generate_text(10)}")

Пример 3: Анализ свойств цепи Маркова

def analyze_markov_chain(transition_matrix, states):
    """Анализ свойств цепи Маркова"""
    P = np.array(transition_matrix)
    n = len(states)

    print("Анализ цепи Маркова:")
    print("=" * 50)

    # Проверка стохастичности матрицы
    row_sums = P.sum(axis=1)
    is_stochastic = np.allclose(row_sums, np.ones(n))
    print(f"Матрица стохастическая: {is_stochastic}")

    # Проверка эргодичности
    # Цепь эргодическая, если она неприводимая и апериодическая

    # Вычисление стационарного распределения
    # Решаем систему: πP = π, ∑π = 1
    A = np.vstack([(P.T - np.eye(n))[:-1], np.ones(n)])
    b = np.hstack([np.zeros(n-1), 1])

    try:
        stationary = np.linalg.solve(A.T @ A, A.T @ b)
        print("\nСтационарное распределение:")
        for i, state in enumerate(states):
            print(f"  {state}: {stationary[i]:.4f}")
    except:
        print("Не удалось найти стационарное распределение")

    # Анализ коммуникативных классов
    print(f"\nКоличество состояний: {n}")

    return stationary

# Анализ нашей цепи погоды
analyze_markov_chain(weather_matrix, weather_states)

Пример 4: Визуализация цепи Маркова

import networkx as nx

def visualize_markov_chain(transition_matrix, states):
    """Визуализация цепи Маркова как графа"""
    G = nx.DiGraph()

    # Добавляем узлы
    for state in states:
        G.add_node(state)

    # Добавляем ребра с весами
    for i, state_from in enumerate(states):
        for j, state_to in enumerate(states):
            prob = transition_matrix[i][j]
            if prob > 0:
                G.add_edge(state_from, state_to, weight=prob, label=f"{prob:.2f}")

    # Рисуем граф
    plt.figure(figsize=(10, 8))
    pos = nx.spring_layout(G)

    # Рисуем узлы
    nx.draw_networkx_nodes(G, pos, node_size=2000, node_color='lightblue')

    # Рисуем подписи узлов
    nx.draw_networkx_labels(G, pos, font_size=12, font_weight='bold')

    # Рисуем ребра
    edges = G.edges()
    weights = [G[u][v]['weight'] for u, v in edges]

    nx.draw_networkx_edges(G, pos, edge_color='gray',
                          arrows=True, arrowsize=20,
                          width=[w * 5 for w in weights])

    # Подписи ребер (вероятности)
    edge_labels = {(u, v): f"{d['weight']:.2f}"
                   for u, v, d in G.edges(data=True)}
    nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_labels)

    plt.title("Визуализация цепи Маркова", size=16)
    plt.axis('off')
    plt.tight_layout()
    plt.show()

# Визуализируем цепь погоды
visualize_markov_chain(weather_matrix, weather_states)

Пример 5: Моделирование траекторий и анализ сходимости

def analyze_convergence(chain, start_state, n_simulations=1000, max_steps=100):
    """Анализ сходимости к стационарному распределению"""
    state_frequencies = {state: [] for state in chain.states}

    for step in range(1, max_steps + 1):
        # Множественное моделирование
        final_states = []
        for _ in range(n_simulations):
            sequence = chain.simulate(start_state, step)
            final_states.append(sequence[-1])

        # Вычисление частот
        for state in chain.states:
            freq = final_states.count(state) / n_simulations
            state_frequencies[state].append(freq)

    # Визуализация сходимости
    plt.figure(figsize=(12, 8))

    for state in chain.states:
        plt.plot(range(1, max_steps + 1), state_frequencies[state],
                label=state, linewidth=2)

    # Стационарное распределение
    stationary = chain.stationary_distribution()
    for state, prob in stationary.items():
        plt.axhline(y=prob, color='gray', linestyle='--', alpha=0.7)
        plt.text(max_steps, prob, f' {state}: {prob:.3f}', va='center')

    plt.xlabel('Количество шагов')
    plt.ylabel('Вероятность')
    plt.title('Сходимость к стационарному распределению')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.show()

# Анализ сходимости для цепи погоды
analyze_convergence(weather_chain, 'Sunny', n_simulations=500, max_steps=50)