[PYTHON/SCIPY] FSK (Frequency Shift Keying) 변조 사용하기
■ FSK (Frequency Shift Keying) 변조를 사용하는 방법을 보여준다. ▶ main.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
import numpy as np import scipy.io.wavfile as wav from scipy.signal import butter from scipy.signal import lfilter # FSK 변조 매개 변수 SAMPLE_RATE = 44100 FREQUENCY_0 = 2000 # 0을 나타내는 주파수 FREQUENCY_1 = 4000 # 1을 나타내는 주파수 BIT_DURATION = 0.01 # 각 비트의 지속 시간 (초) def generateSine(frequency, duration): t = np.linspace(0, duration, int(SAMPLE_RATE * duration), False) return np.sin(2 * np.pi * frequency * t) def modulateFSK(bitString): targetList = [] for bit in bitString: if bit == '0': targetList.extend(generateSine(FREQUENCY_0, BIT_DURATION)) else: targetList.extend(generateSine(FREQUENCY_1, BIT_DURATION)) return np.array(targetList) def addNoise(signal, snrDB): # sndDB : Signal-to-Noise Ratio in decibels signalPower = np.mean(signal ** 2) noisePower = signalPower / (10 ** (snrDB / 10)) noise = np.random.normal(0, np.sqrt(noisePower), len(signal)) return signal + noise def butterBandpass(lowcut, highcut, samplingFrequency, order = 5): nyq = 0.5 * samplingFrequency low = lowcut / nyq high = highcut / nyq b, a = butter(order, [low, high], btype = "band") return b, a def butterBandpassFilter(data, lowCut, highCut, samplingFrequency, order = 5): b, a = butterBandpass(lowCut, highCut, samplingFrequency, order=order) y = lfilter(b, a, data) return y def demodulateFSK(signalNDArray): ndArrayFiltered0 = butterBandpassFilter(signalNDArray, FREQUENCY_0 - 200, FREQUENCY_0 + 200, SAMPLE_RATE) ndArrayFiltered1 = butterBandpassFilter(signalNDArray, FREQUENCY_1 - 200, FREQUENCY_1 + 200, SAMPLE_RATE) sampleCountPerBit = int(BIT_DURATION * SAMPLE_RATE) energy0NDArray = np.array([np.sum(ndArrayFiltered0[i:i + sampleCountPerBit] ** 2) for i in range(0, len(ndArrayFiltered0), sampleCountPerBit)]) energy1NDArray = np.array([np.sum(ndArrayFiltered1[i:i + sampleCountPerBit] ** 2) for i in range(0, len(ndArrayFiltered1), sampleCountPerBit)]) return "".join(["0" if e0 > e1 else "1" for e0, e1 in zip(energy0NDArray, energy1NDArray)]) def encodeFile(sourceFilePath, filePathEncoded, snrDB = 20): with open(sourceFilePath, "rb") as bufferedReader: readBytes = bufferedReader.read() bitString = "".join(format(byte, "08b") for byte in readBytes) # 동기화를 위한 프리앰블 추가한다. preambleString = "10" * 10 # 10번의 1, 0을 반복한다. bitString = preambleString + bitString # FSK 변조를 처리한다. ndArrayModulated = modulateFSK(bitString) # 잡음을 추가한다. ndArrayAddedNoisySignal = addNoise(ndArrayModulated, snrDB) # WAV 파일로 저장한다. wav.write(filePathEncoded, SAMPLE_RATE, (ndArrayAddedNoisySignal * 32767).astype(np.int16)) def decodeFile(filePathEncoded, filePathDecoded): sampleRate, ndArray = wav.read(filePathEncoded) # 배열값을 float로 정규화한다. signalNDArray = ndArray.astype(float) / 32767.0 # FSK 복조를 처리한다. stringDemodulated = demodulateFSK(signalNDArray) # 프리앰블을 제거하고 동기화를 처리한다. preambleString = "10" * 10 synchronizationIndex = stringDemodulated.index(preambleString) + len(preambleString) stringDemodulated = stringDemodulated[synchronizationIndex:] # 비트 문자열을 바이트로 변환한다. bytesDecoded = bytes(int(stringDemodulated[i:i + 8], 2) for i in range(0, len(stringDemodulated), 8)) # 파일로 저장한다. with open(filePathDecoded, "wb") as bufferedWriter: bufferedWriter.write(bytesDecoded) sourceFilePath = "source.zip" filePathEncoded = "target.wav" filePathDecoded = "target.zip" encodeFile(sourceFilePath , filePathEncoded) decodeFile(filePathEncoded, filePathDecoded) |
▶ requirements.txt
1 2 3 4 |
numpy==2.1.1 scipy==1.14.1 |
※ pip install scipy 명령을 실행했다.